You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by GitBox <gi...@apache.org> on 2020/05/27 07:32:00 UTC

[GitHub] [bahir-flink] gyfora commented on a change in pull request #82: Add batch table env support and filter push down

gyfora commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r430908058



##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +156,167 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        // try to convert Flink filter expressions to Kudu Filter Info
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>(predicates.size());
+        List<Expression> unsupportedExpressions = new ArrayList<>(predicates.size());
+        for (Expression pred : predicates) {
+            KuduFilterInfo kuduPred = toKuduFilterInfo(pred);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo [{}] and pushed into " +
+                    "KuduTable [{}].", pred, kuduPred, tableInfo.getName());
+                kuduPredicates.add(kuduPred);
+            } else {
+                unsupportedExpressions.add(pred);
+                LOG.info("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].",
+                    pred, tableInfo.getName());
+            }
+        }
+        // update list of Flink expressions to unsupported expressions
+        predicates.clear();
+        predicates.addAll(unsupportedExpressions);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, kuduPredicates, projectedFields);
+    }
+
+    /**
+     * Converts Flink Expression to KuduFilterInfo.
+     */
+    @Nullable
+    private KuduFilterInfo toKuduFilterInfo(Expression predicate) {

Review comment:
       Could we move all the logic translating between Expression to KuduFilterInfo to a Utility class? That would leave the source cleaner.

##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -17,55 +17,106 @@
 
 package org.apache.flink.connectors.kudu.table;
 
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connectors.kudu.batch.KuduRowInputFormat;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.planner.expressions.Attribute;
+import org.apache.flink.table.planner.expressions.BinaryComparison;
+import org.apache.flink.table.planner.expressions.EqualTo;
+import org.apache.flink.table.planner.expressions.GreaterThan;
+import org.apache.flink.table.planner.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.planner.expressions.IsNotNull;
+import org.apache.flink.table.planner.expressions.IsNull;
+import org.apache.flink.table.planner.expressions.LessThan;
+import org.apache.flink.table.planner.expressions.LessThanOrEqual;
+import org.apache.flink.table.planner.expressions.Literal;
+import org.apache.flink.table.planner.expressions.UnaryExpression;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.sources.LimitableTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
-import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
-public class KuduTableSource implements StreamTableSource<Row>, LimitableTableSource<Row>, ProjectableTableSource<Row> {
+public class KuduTableSource extends InputFormatTableSource<Row> implements
+    LimitableTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduTableSource.class);
+
+    @SuppressWarnings("unchecked")
+    private static final Set<BasicTypeInfo> VALID_LITERAL_TYPE = new HashSet() {{

Review comment:
       Should probably go to a Utility class with all the filter logic

##########
File path: flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class KuduTableSourceTest extends KuduTestBase {
+    private BatchTableEnvironment tableEnv;
+    private KuduCatalog catalog;
+
+    @BeforeEach
+    public void init() {
+        KuduTableInfo tableInfo = booksTableInfo("books", true);
+        setUpDatabase(tableInfo);
+        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        tableEnv = KuduTableTestUtils.createBatchTableEnvWithBlinkPlannerBatchMode(env);
+        catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        tableEnv.registerCatalog("kudu", catalog);
+        tableEnv.useCatalog("kudu");
+    }
+
+    @Test
+    public void testFullScan() throws Exception {
+        Table table = tableEnv.sqlQuery("select * from books order by id");
+        DataSet<Row> dataSet = tableEnv.toDataSet(table, Row.class);
+        List<Row> result = dataSet.collect();
+        // check result
+        assertEquals(5, result.size());
+        assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11",
+            result.get(0).toString());
+        tableEnv.sqlUpdate("DROP TABLE books");
+    }
+
+    @Test
+    public void testScanWithProjectionAndFilter() throws Exception {

Review comment:
       With this test we cannot be sure that the filters were actually pushed down and it works correctly.
   
   We should also add a unit test style test that validates the KuduFilterInfos created and the KuduTableSource directly.
   Things like the applyPredicate or isFilterPushed down and all the rest.
   
   The problem here is that we add a lot of different filtering logic that can easily break the output of SQL queries if incorrect, so we really want to make sure that it is tested.

##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +156,167 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        // try to convert Flink filter expressions to Kudu Filter Info
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>(predicates.size());
+        List<Expression> unsupportedExpressions = new ArrayList<>(predicates.size());
+        for (Expression pred : predicates) {
+            KuduFilterInfo kuduPred = toKuduFilterInfo(pred);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo [{}] and pushed into " +
+                    "KuduTable [{}].", pred, kuduPred, tableInfo.getName());
+                kuduPredicates.add(kuduPred);
+            } else {
+                unsupportedExpressions.add(pred);
+                LOG.info("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].",
+                    pred, tableInfo.getName());
+            }
+        }
+        // update list of Flink expressions to unsupported expressions
+        predicates.clear();

Review comment:
       Tha javadocs of applyPredicate specifies that we should remove the applicable expressions from the list only.
   I know it's not a large difference but maybe it would be better to use an iterator and remove the expression if pushed down.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org