You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by GitBox <gi...@apache.org> on 2020/05/21 07:20:40 UTC

[GitHub] [bahir-flink] sebastianliu opened a new pull request #82: add batch table env support and filter push down

sebastianliu opened a new pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82


   It was glad to see that community's Flink-kudu connector has been reworked recently. The table api has been supported which enable better interaction between Flink SQL and Kudu. However, there is no support for flink batch SQL currently, include filter pushing down. In this PR, I change KuduTableSource to inherit from InputFormatTableSource to support both streaming SQL and Batch SQL at the same time. In order to reduce unnecessary data transmission, I also added the filter push down to the KuduTableSource. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] lresende commented on pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
lresende commented on pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#issuecomment-634450086


   @gyfora do you want to take a look at this as well ? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on a change in pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r444033913



##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
##########
@@ -65,6 +65,7 @@
     public static final String KUDU_HASH_COLS = "kudu.hash-columns";
     public static final String KUDU_PRIMARY_KEY_COLS = "kudu.primary-key-columns";
     public static final String KUDU_REPLICAS = "kudu.replicas";
+    public static final String KUDU_IS_BOUNDED = "kudu.is-bounded";

Review comment:
       already remove this unnecessary flag




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] gyfora commented on a change in pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r438728912



##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
##########
@@ -65,6 +65,7 @@
     public static final String KUDU_HASH_COLS = "kudu.hash-columns";
     public static final String KUDU_PRIMARY_KEY_COLS = "kudu.primary-key-columns";
     public static final String KUDU_REPLICAS = "kudu.replicas";
+    public static final String KUDU_IS_BOUNDED = "kudu.is-bounded";

Review comment:
       Why are we introducing the is-bounded flag? As far as I can tell this only affects the isBounded() method of the KuduTableSource and not the actual reading of the data.
   
   The Kudu reading logic at the moment reads the table contents at the start of the job so it is always bounded, I think we should not touch this logic now. 
   
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] gyfora commented on pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#issuecomment-634471965


   Thanks @lresende , I can take a look today :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on a change in pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r438062447



##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -17,55 +17,106 @@
 
 package org.apache.flink.connectors.kudu.table;
 
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connectors.kudu.batch.KuduRowInputFormat;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.planner.expressions.Attribute;
+import org.apache.flink.table.planner.expressions.BinaryComparison;
+import org.apache.flink.table.planner.expressions.EqualTo;
+import org.apache.flink.table.planner.expressions.GreaterThan;
+import org.apache.flink.table.planner.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.planner.expressions.IsNotNull;
+import org.apache.flink.table.planner.expressions.IsNull;
+import org.apache.flink.table.planner.expressions.LessThan;
+import org.apache.flink.table.planner.expressions.LessThanOrEqual;
+import org.apache.flink.table.planner.expressions.Literal;
+import org.apache.flink.table.planner.expressions.UnaryExpression;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.sources.LimitableTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
-import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
-public class KuduTableSource implements StreamTableSource<Row>, LimitableTableSource<Row>, ProjectableTableSource<Row> {
+public class KuduTableSource extends InputFormatTableSource<Row> implements
+    LimitableTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduTableSource.class);
+
+    @SuppressWarnings("unchecked")
+    private static final Set<BasicTypeInfo> VALID_LITERAL_TYPE = new HashSet() {{

Review comment:
       Moved to KuduTableUtils




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] gyfora commented on a change in pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r438816294



##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +147,41 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, bounded, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>();
+        ListIterator<Expression> predicatesIter = predicates.listIterator();
+        while(predicatesIter.hasNext()) {
+            Expression predicate = predicatesIter.next();
+            KuduFilterInfo kuduPred = toKuduFilterInfo(predicate);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo and pushed into " +
+                    "KuduTable [{}].", predicate, tableInfo.getName());
+                kuduPredicates.add(kuduPred);
+                predicatesIter.remove();
+            } else {
+                LOG.info("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].",

Review comment:
       debug log maybe?

##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +147,41 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, bounded, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>();
+        ListIterator<Expression> predicatesIter = predicates.listIterator();
+        while(predicatesIter.hasNext()) {
+            Expression predicate = predicatesIter.next();
+            KuduFilterInfo kuduPred = toKuduFilterInfo(predicate);

Review comment:
       Would be nicer to return Optional here but I leave this up to you

##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +147,41 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, bounded, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>();
+        ListIterator<Expression> predicatesIter = predicates.listIterator();
+        while(predicatesIter.hasNext()) {
+            Expression predicate = predicatesIter.next();
+            KuduFilterInfo kuduPred = toKuduFilterInfo(predicate);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo and pushed into " +

Review comment:
       Should we use debug logging here?

##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +147,41 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, bounded, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>();
+        ListIterator<Expression> predicatesIter = predicates.listIterator();
+        while(predicatesIter.hasNext()) {
+            Expression predicate = predicatesIter.next();
+            KuduFilterInfo kuduPred = toKuduFilterInfo(predicate);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo and pushed into " +
+                    "KuduTable [{}].", predicate, tableInfo.getName());
+                kuduPredicates.add(kuduPred);
+                predicatesIter.remove();
+            } else {
+                LOG.info("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].",
+                    predicate, tableInfo.getName());
+            }
+        }
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, bounded, kuduPredicates, projectedFields);
     }
 
     @Override
     public String explainSource() {
-        return "KuduStreamTableSource[schema=" + Arrays.toString(getTableSchema().getFieldNames())
-                + (projectedFields != null ?", projectFields=" + Arrays.toString(projectedFields) + "]" : "]");
+        return "KuduTableSource[schema=" + Arrays.toString(getTableSchema().getFieldNames()) +
+            ", filter=" + predicateString() +
+            (projectedFields != null ?", projectFields=" + Arrays.toString(projectedFields) + "]" : "]");
+    }
+
+    private String predicateString() {
+        if (predicates == null || predicates.size() == 0) {
+            return "FALSE";

Review comment:
       Maybe instead of false, something like No Filters / predicates?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#issuecomment-646557950


   > I had a few minor comments but it looks pretty good in general.
   > 
   > I also commented already on the is-bounded question, I think we should remove that logic from this PR as it does not introduce new functionality or change anything at the moment.
   
   @gyfora Hi Gyula, very appreciate for your code review. And for the `is-bounded` comment, I answered some of my ideas, but I didn't resolve it.  Looking forward to the further discussion and review. Thx again :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#issuecomment-648540901


   Hi @lresende , I would greatly appreciate if you could review this and all comments have been resolved so far. Let me know if you need further input from me. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#issuecomment-642004251


   @gyfora Appreciate for your time to review this PR. I have resolved all of above comments. Hope you can review another round. And sorry for the delay.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#issuecomment-647978214


   @gyfora Hi Gyula, thx for your explanation. The is-bounded flag is really unnecessary and I have removed it. Hope you can help to review again. Thx for your time~


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] gyfora commented on a change in pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
gyfora commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r430908058



##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +156,167 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        // try to convert Flink filter expressions to Kudu Filter Info
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>(predicates.size());
+        List<Expression> unsupportedExpressions = new ArrayList<>(predicates.size());
+        for (Expression pred : predicates) {
+            KuduFilterInfo kuduPred = toKuduFilterInfo(pred);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo [{}] and pushed into " +
+                    "KuduTable [{}].", pred, kuduPred, tableInfo.getName());
+                kuduPredicates.add(kuduPred);
+            } else {
+                unsupportedExpressions.add(pred);
+                LOG.info("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].",
+                    pred, tableInfo.getName());
+            }
+        }
+        // update list of Flink expressions to unsupported expressions
+        predicates.clear();
+        predicates.addAll(unsupportedExpressions);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, kuduPredicates, projectedFields);
+    }
+
+    /**
+     * Converts Flink Expression to KuduFilterInfo.
+     */
+    @Nullable
+    private KuduFilterInfo toKuduFilterInfo(Expression predicate) {

Review comment:
       Could we move all the logic translating between Expression to KuduFilterInfo to a Utility class? That would leave the source cleaner.

##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -17,55 +17,106 @@
 
 package org.apache.flink.connectors.kudu.table;
 
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connectors.kudu.batch.KuduRowInputFormat;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.planner.expressions.Attribute;
+import org.apache.flink.table.planner.expressions.BinaryComparison;
+import org.apache.flink.table.planner.expressions.EqualTo;
+import org.apache.flink.table.planner.expressions.GreaterThan;
+import org.apache.flink.table.planner.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.planner.expressions.IsNotNull;
+import org.apache.flink.table.planner.expressions.IsNull;
+import org.apache.flink.table.planner.expressions.LessThan;
+import org.apache.flink.table.planner.expressions.LessThanOrEqual;
+import org.apache.flink.table.planner.expressions.Literal;
+import org.apache.flink.table.planner.expressions.UnaryExpression;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.sources.LimitableTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
-import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
-public class KuduTableSource implements StreamTableSource<Row>, LimitableTableSource<Row>, ProjectableTableSource<Row> {
+public class KuduTableSource extends InputFormatTableSource<Row> implements
+    LimitableTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduTableSource.class);
+
+    @SuppressWarnings("unchecked")
+    private static final Set<BasicTypeInfo> VALID_LITERAL_TYPE = new HashSet() {{

Review comment:
       Should probably go to a Utility class with all the filter logic

##########
File path: flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class KuduTableSourceTest extends KuduTestBase {
+    private BatchTableEnvironment tableEnv;
+    private KuduCatalog catalog;
+
+    @BeforeEach
+    public void init() {
+        KuduTableInfo tableInfo = booksTableInfo("books", true);
+        setUpDatabase(tableInfo);
+        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        tableEnv = KuduTableTestUtils.createBatchTableEnvWithBlinkPlannerBatchMode(env);
+        catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        tableEnv.registerCatalog("kudu", catalog);
+        tableEnv.useCatalog("kudu");
+    }
+
+    @Test
+    public void testFullScan() throws Exception {
+        Table table = tableEnv.sqlQuery("select * from books order by id");
+        DataSet<Row> dataSet = tableEnv.toDataSet(table, Row.class);
+        List<Row> result = dataSet.collect();
+        // check result
+        assertEquals(5, result.size());
+        assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11",
+            result.get(0).toString());
+        tableEnv.sqlUpdate("DROP TABLE books");
+    }
+
+    @Test
+    public void testScanWithProjectionAndFilter() throws Exception {

Review comment:
       With this test we cannot be sure that the filters were actually pushed down and it works correctly.
   
   We should also add a unit test style test that validates the KuduFilterInfos created and the KuduTableSource directly.
   Things like the applyPredicate or isFilterPushed down and all the rest.
   
   The problem here is that we add a lot of different filtering logic that can easily break the output of SQL queries if incorrect, so we really want to make sure that it is tested.

##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +156,167 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        // try to convert Flink filter expressions to Kudu Filter Info
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>(predicates.size());
+        List<Expression> unsupportedExpressions = new ArrayList<>(predicates.size());
+        for (Expression pred : predicates) {
+            KuduFilterInfo kuduPred = toKuduFilterInfo(pred);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo [{}] and pushed into " +
+                    "KuduTable [{}].", pred, kuduPred, tableInfo.getName());
+                kuduPredicates.add(kuduPred);
+            } else {
+                unsupportedExpressions.add(pred);
+                LOG.info("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].",
+                    pred, tableInfo.getName());
+            }
+        }
+        // update list of Flink expressions to unsupported expressions
+        predicates.clear();

Review comment:
       Tha javadocs of applyPredicate specifies that we should remove the applicable expressions from the list only.
   I know it's not a large difference but maybe it would be better to use an iterator and remove the expression if pushed down.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#issuecomment-631937516


   @lresende Appreciate for your time to review this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on a change in pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r438063271



##########
File path: flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class KuduTableSourceTest extends KuduTestBase {
+    private BatchTableEnvironment tableEnv;
+    private KuduCatalog catalog;
+
+    @BeforeEach
+    public void init() {
+        KuduTableInfo tableInfo = booksTableInfo("books", true);
+        setUpDatabase(tableInfo);
+        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        tableEnv = KuduTableTestUtils.createBatchTableEnvWithBlinkPlannerBatchMode(env);
+        catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        tableEnv.registerCatalog("kudu", catalog);
+        tableEnv.useCatalog("kudu");
+    }
+
+    @Test
+    public void testFullScan() throws Exception {
+        Table table = tableEnv.sqlQuery("select * from books order by id");
+        DataSet<Row> dataSet = tableEnv.toDataSet(table, Row.class);
+        List<Row> result = dataSet.collect();
+        // check result
+        assertEquals(5, result.size());
+        assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11",
+            result.get(0).toString());
+        tableEnv.sqlUpdate("DROP TABLE books");
+    }
+
+    @Test
+    public void testScanWithProjectionAndFilter() throws Exception {

Review comment:
       Good point. I have changed this test to be an integration test and added a new unit test style test for KuduTableSource. Thx 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] lresende merged pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
lresende merged pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on a change in pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r438062144



##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +156,167 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        // try to convert Flink filter expressions to Kudu Filter Info
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>(predicates.size());
+        List<Expression> unsupportedExpressions = new ArrayList<>(predicates.size());
+        for (Expression pred : predicates) {
+            KuduFilterInfo kuduPred = toKuduFilterInfo(pred);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo [{}] and pushed into " +
+                    "KuduTable [{}].", pred, kuduPred, tableInfo.getName());
+                kuduPredicates.add(kuduPred);
+            } else {
+                unsupportedExpressions.add(pred);
+                LOG.info("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].",
+                    pred, tableInfo.getName());
+            }
+        }
+        // update list of Flink expressions to unsupported expressions
+        predicates.clear();
+        predicates.addAll(unsupportedExpressions);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, kuduPredicates, projectedFields);
+    }
+
+    /**
+     * Converts Flink Expression to KuduFilterInfo.
+     */
+    @Nullable
+    private KuduFilterInfo toKuduFilterInfo(Expression predicate) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on a change in pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r438061927



##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +156,167 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        // try to convert Flink filter expressions to Kudu Filter Info
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>(predicates.size());
+        List<Expression> unsupportedExpressions = new ArrayList<>(predicates.size());
+        for (Expression pred : predicates) {
+            KuduFilterInfo kuduPred = toKuduFilterInfo(pred);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo [{}] and pushed into " +
+                    "KuduTable [{}].", pred, kuduPred, tableInfo.getName());
+                kuduPredicates.add(kuduPred);
+            } else {
+                unsupportedExpressions.add(pred);
+                LOG.info("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].",
+                    pred, tableInfo.getName());
+            }
+        }
+        // update list of Flink expressions to unsupported expressions
+        predicates.clear();

Review comment:
       Fixed. This has been changed to use iterator. Thx for this reminder




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] gyfora commented on pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#issuecomment-647410546


   > > I had a few minor comments but it looks pretty good in general.
   > > I also commented already on the is-bounded question, I think we should remove that logic from this PR as it does not introduce new functionality or change anything at the moment.
   > 
   > @gyfora Hi Gyula, very appreciate for your code review. And for the `is-bounded` comment, I answered some of my ideas, but I didn't resolve it. Looking forward to the further discussion and review. Thx again :)
   
   As far as I understand the logic on the Flink side, the is-bounded flag does not affect the streaming execution and it should represent the actual source implementation.
   
   The Kudu table source is always bounded right now. Therefore we should always return isBounded = true and not expose any options to change this as that might lead to incorrect/unexpected behavior.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] gyfora commented on pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
gyfora commented on pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#issuecomment-642010859


   Thank you, I will look at this first thing tomorrow :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on a change in pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r442755855



##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
##########
@@ -65,6 +65,7 @@
     public static final String KUDU_HASH_COLS = "kudu.hash-columns";
     public static final String KUDU_PRIMARY_KEY_COLS = "kudu.primary-key-columns";
     public static final String KUDU_REPLICAS = "kudu.replicas";
+    public static final String KUDU_IS_BOUNDED = "kudu.is-bounded";

Review comment:
       For the `is-bounded` flag, it really just control the `isBounded()` method in 
   `KuduTableSource` which inherit from `StreamTableSource`. The main reason I add this is because there may be some users who want to use this kudu connector in a Flink Batch SQL job. And this is usually used under the Blink Batch mode. I also added a few notes to the README file. In addition, I keep the default behavior of this `is-bounded` flag, which is false in `StreamTableSource`.  :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] sebastianliu commented on a change in pull request #82: Add batch table env support and filter push down

Posted by GitBox <gi...@apache.org>.
sebastianliu commented on a change in pull request #82:
URL: https://github.com/apache/bahir-flink/pull/82#discussion_r442750071



##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +147,41 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, bounded, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>();
+        ListIterator<Expression> predicatesIter = predicates.listIterator();
+        while(predicatesIter.hasNext()) {
+            Expression predicate = predicatesIter.next();
+            KuduFilterInfo kuduPred = toKuduFilterInfo(predicate);

Review comment:
       Good suggestion, has changed to use `Optional` var

##########
File path: flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
##########
@@ -109,12 +147,41 @@ public boolean isLimitPushedDown() {
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, bounded, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>();
+        ListIterator<Expression> predicatesIter = predicates.listIterator();
+        while(predicatesIter.hasNext()) {
+            Expression predicate = predicatesIter.next();
+            KuduFilterInfo kuduPred = toKuduFilterInfo(predicate);
+            if (kuduPred != null) {
+                LOG.info("Predicate [{}] converted into KuduFilterInfo and pushed into " +

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org