You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/17 09:42:41 UTC

[GitHub] [flink-table-store] tsreaper opened a new pull request, #124: [FLINK-27580] Implement filter pushdown for TableStoreHiveStorageHandler

tsreaper opened a new pull request, #124:
URL: https://github.com/apache/flink-table-store/pull/124

   Filter pushdown is a critical optimization for sources as it can decrease number of records to read. Hive provides a `HiveStoragePredicateHandler` interface for this purpose. We need to implement this interface in `TableStoreHiveStorageHandler`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #124: [FLINK-27580] Implement filter pushdown for TableStoreHiveStorageHandler

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #124:
URL: https://github.com/apache/flink-table-store/pull/124


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #124: [FLINK-27580] Implement filter pushdown for TableStoreHiveStorageHandler

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #124:
URL: https://github.com/apache/flink-table-store/pull/124#discussion_r874643797


##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.GreaterOrEqual;
+import org.apache.flink.table.store.file.predicate.GreaterThan;
+import org.apache.flink.table.store.file.predicate.IsNotNull;
+import org.apache.flink.table.store.file.predicate.IsNull;
+import org.apache.flink.table.store.file.predicate.LessOrEqual;
+import org.apache.flink.table.store.file.predicate.LessThan;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.NotEqual;
+import org.apache.flink.table.store.file.predicate.Or;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Optional;
+
+/** Converts {@link SearchArgument} to {@link Predicate} with best effort. */
+public class SearchArgumentToPredicateConverter {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(SearchArgumentToPredicateConverter.class);
+
+    private final ExpressionTree tree;
+    private final List<PredicateLeaf> leaves;
+    private final List<String> columnNames;
+    private final List<LogicalType> columnTypes;
+
+    public SearchArgumentToPredicateConverter(
+            SearchArgument sarg, List<String> columnNames, List<LogicalType> columnTypes) {
+        this.tree = sarg.getExpression();
+        this.leaves = sarg.getLeaves();
+        this.columnNames = columnNames;
+        this.columnTypes = columnTypes;
+    }
+
+    public Optional<Predicate> convert() {
+        try {
+            return Optional.of(convertTree(tree));
+        } catch (Throwable t) {

Review Comment:
   I prefer to catch a specific exception to avoid unexpected bugs.



##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.GreaterOrEqual;
+import org.apache.flink.table.store.file.predicate.GreaterThan;
+import org.apache.flink.table.store.file.predicate.IsNotNull;
+import org.apache.flink.table.store.file.predicate.IsNull;
+import org.apache.flink.table.store.file.predicate.LessOrEqual;
+import org.apache.flink.table.store.file.predicate.LessThan;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.NotEqual;
+import org.apache.flink.table.store.file.predicate.Or;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Optional;
+
+/** Converts {@link SearchArgument} to {@link Predicate} with best effort. */
+public class SearchArgumentToPredicateConverter {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(SearchArgumentToPredicateConverter.class);
+
+    private final ExpressionTree tree;
+    private final List<PredicateLeaf> leaves;
+    private final List<String> columnNames;
+    private final List<LogicalType> columnTypes;
+
+    public SearchArgumentToPredicateConverter(
+            SearchArgument sarg, List<String> columnNames, List<LogicalType> columnTypes) {
+        this.tree = sarg.getExpression();
+        this.leaves = sarg.getLeaves();
+        this.columnNames = columnNames;
+        this.columnTypes = columnTypes;
+    }
+
+    public Optional<Predicate> convert() {
+        try {
+            return Optional.of(convertTree(tree));
+        } catch (Throwable t) {
+            LOG.warn("Failed to convert predicate. Filter will be processed by Hive instead.", t);
+            return Optional.empty();
+        }
+    }
+
+    private Predicate convertTree(ExpressionTree tree) {
+        List<ExpressionTree> children = tree.getChildren();
+        switch (tree.getOperator()) {
+            case OR:
+                return children.stream().map(this::convertTree).reduce(Or::new).get();
+            case AND:
+                return children.stream().map(this::convertTree).reduce(And::new).get();
+            case NOT:
+                return convertNotTree(children.get(0));
+            case LEAF:
+                return convertLeaf(leaves.get(tree.getLeaf()));
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported operator " + tree.getOperator());
+        }
+    }
+
+    private Predicate convertNotTree(ExpressionTree tree) {
+        List<ExpressionTree> children = tree.getChildren();
+        switch (tree.getOperator()) {
+            case OR:
+                return children.stream().map(this::convertNotTree).reduce(And::new).get();
+            case AND:
+                return children.stream().map(this::convertNotTree).reduce(Or::new).get();
+            case NOT:
+                return convertTree(children.get(0));
+            case LEAF:
+                return convertNotLeaf(leaves.get(tree.getLeaf()));
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported operator " + tree.getOperator());
+        }
+    }
+
+    private Predicate convertLeaf(PredicateLeaf leaf) {
+        String columnName = leaf.getColumnName();
+        int idx = columnNames.indexOf(columnName);
+        Preconditions.checkArgument(idx >= 0, "Column " + columnName + " not found.");
+        LogicalType columnType = columnTypes.get(idx);
+        switch (leaf.getOperator()) {
+            case EQUALS:
+                return new Equal(idx, toLiteral(columnType, leaf.getLiteral()));
+            case LESS_THAN:
+                return new LessThan(idx, toLiteral(columnType, leaf.getLiteral()));
+            case LESS_THAN_EQUALS:
+                return new LessOrEqual(idx, toLiteral(columnType, leaf.getLiteral()));
+            case IN:
+                return leaf.getLiteralList().stream()
+                        .map(o -> (Predicate) new Equal(idx, toLiteral(columnType, o)))
+                        .reduce(Or::new)
+                        .get();
+            case BETWEEN:
+                List<Object> literalList = leaf.getLiteralList();
+                return new And(
+                        new GreaterOrEqual(idx, toLiteral(columnType, literalList.get(0))),
+                        new LessOrEqual(idx, toLiteral(columnType, literalList.get(1))));
+            case IS_NULL:
+                return new IsNull(idx);
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported operator " + tree.getOperator());
+        }
+    }
+
+    private Predicate convertNotLeaf(PredicateLeaf leaf) {
+        String columnName = leaf.getColumnName();
+        int idx = columnNames.indexOf(columnName);
+        Preconditions.checkArgument(idx >= 0, "Column " + columnName + " not found.");
+        LogicalType columnType = columnTypes.get(idx);
+        switch (leaf.getOperator()) {
+            case EQUALS:

Review Comment:
   Ditto (I think negate logical for not should in flink-table-store-core.)



##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.GreaterOrEqual;
+import org.apache.flink.table.store.file.predicate.GreaterThan;
+import org.apache.flink.table.store.file.predicate.IsNotNull;
+import org.apache.flink.table.store.file.predicate.IsNull;
+import org.apache.flink.table.store.file.predicate.LessOrEqual;
+import org.apache.flink.table.store.file.predicate.LessThan;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.NotEqual;
+import org.apache.flink.table.store.file.predicate.Or;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Optional;
+
+/** Converts {@link SearchArgument} to {@link Predicate} with best effort. */
+public class SearchArgumentToPredicateConverter {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(SearchArgumentToPredicateConverter.class);
+
+    private final ExpressionTree tree;

Review Comment:
   rename `tree` to `root`?
   There are several places where the exception reference to `tree` is wrong, and the same name tends to cause this problem



##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.file.predicate.And;
+import org.apache.flink.table.store.file.predicate.Equal;
+import org.apache.flink.table.store.file.predicate.GreaterOrEqual;
+import org.apache.flink.table.store.file.predicate.GreaterThan;
+import org.apache.flink.table.store.file.predicate.IsNotNull;
+import org.apache.flink.table.store.file.predicate.IsNull;
+import org.apache.flink.table.store.file.predicate.LessOrEqual;
+import org.apache.flink.table.store.file.predicate.LessThan;
+import org.apache.flink.table.store.file.predicate.Literal;
+import org.apache.flink.table.store.file.predicate.NotEqual;
+import org.apache.flink.table.store.file.predicate.Or;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Optional;
+
+/** Converts {@link SearchArgument} to {@link Predicate} with best effort. */
+public class SearchArgumentToPredicateConverter {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(SearchArgumentToPredicateConverter.class);
+
+    private final ExpressionTree tree;
+    private final List<PredicateLeaf> leaves;
+    private final List<String> columnNames;
+    private final List<LogicalType> columnTypes;
+
+    public SearchArgumentToPredicateConverter(
+            SearchArgument sarg, List<String> columnNames, List<LogicalType> columnTypes) {
+        this.tree = sarg.getExpression();
+        this.leaves = sarg.getLeaves();
+        this.columnNames = columnNames;
+        this.columnTypes = columnTypes;
+    }
+
+    public Optional<Predicate> convert() {
+        try {
+            return Optional.of(convertTree(tree));
+        } catch (Throwable t) {
+            LOG.warn("Failed to convert predicate. Filter will be processed by Hive instead.", t);
+            return Optional.empty();
+        }
+    }
+
+    private Predicate convertTree(ExpressionTree tree) {
+        List<ExpressionTree> children = tree.getChildren();
+        switch (tree.getOperator()) {
+            case OR:
+                return children.stream().map(this::convertTree).reduce(Or::new).get();
+            case AND:
+                return children.stream().map(this::convertTree).reduce(And::new).get();
+            case NOT:
+                return convertNotTree(children.get(0));
+            case LEAF:
+                return convertLeaf(leaves.get(tree.getLeaf()));
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported operator " + tree.getOperator());
+        }
+    }
+
+    private Predicate convertNotTree(ExpressionTree tree) {

Review Comment:
   I think negate logical for `not` should in `flink-table-store-core`.



##########
flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java:
##########
@@ -91,65 +101,115 @@ public RecordReader<Void, RowDataContainer> getRecordReader(
                 splitLength);
     }
 
-    private FileStoreImpl createFileStore(JobConf jobConf) {
-        TableStoreJobConf wrapper = new TableStoreJobConf(jobConf);
-
-        String dbName = wrapper.getDbName();
-        String tableName = wrapper.getTableName();
-
-        Configuration options = new Configuration();
-        String tableLocation = wrapper.getLocation();
-        wrapper.updateFileStoreOptions(options);
-
-        String user = wrapper.getFileStoreUser();
-
-        List<String> columnNames = wrapper.getColumnNames();
-        List<LogicalType> columnTypes = wrapper.getColumnTypes();
-
-        List<String> partitionColumnNames = wrapper.getPartitionColumnNames();
-
-        RowType rowType =
-                RowType.of(
-                        columnTypes.toArray(new LogicalType[0]),
-                        columnNames.toArray(new String[0]));
-        LogicalType[] partitionLogicalTypes =
-                partitionColumnNames.stream()
-                        .map(s -> columnTypes.get(columnNames.indexOf(s)))
-                        .toArray(LogicalType[]::new);
-        RowType partitionType =
-                RowType.of(partitionLogicalTypes, partitionColumnNames.toArray(new String[0]));
-
-        Optional<List<String>> optionalPrimaryKeyNames = wrapper.getPrimaryKeyNames();
-        if (optionalPrimaryKeyNames.isPresent()) {
-            Function<String, RowType.RowField> rowFieldMapper =
-                    s -> {
-                        int idx = columnNames.indexOf(s);
-                        Preconditions.checkState(
-                                idx >= 0,
-                                "Primary key column "
-                                        + s
-                                        + " not found in table "
-                                        + dbName
-                                        + "."
-                                        + tableName);
-                        return new RowType.RowField(s, columnTypes.get(idx));
-                    };
-            RowType primaryKeyType =
-                    new RowType(
-                            optionalPrimaryKeyNames.get().stream()
-                                    .map(rowFieldMapper)
-                                    .collect(Collectors.toList()));
-            return FileStoreImpl.createWithPrimaryKey(
-                    tableLocation,
-                    new FileStoreOptions(options),
-                    user,
-                    partitionType,
-                    primaryKeyType,
-                    rowType,
-                    FileStoreOptions.MergeEngine.DEDUPLICATE);
-        } else {
-            return FileStoreImpl.createWithValueCount(
-                    tableLocation, new FileStoreOptions(options), user, partitionType, rowType);
+    private static class FileStoreWrapper {

Review Comment:
   Maybe we should have a `FileStoreFactory` in `flink-table-store-core` after https://github.com/apache/flink-table-store/pull/101
   Then we can create `Predicate` from that. (Maybe get row type from it)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org