You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2020/07/14 03:25:54 UTC

[bahir-flink] branch master updated: Add batch table env support and filter push down to Kudu connector (#82)

This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e215544  Add batch table env support and filter push down to Kudu connector (#82)
e215544 is described below

commit e21554484faff9169c7c53784436a28fbf5205b5
Author: Sebastian Liu <li...@gmail.com>
AuthorDate: Tue Jul 14 11:25:44 2020 +0800

    Add batch table env support and filter push down to Kudu connector (#82)
    
    Update the KuduTableSource to inherit from InputFormatTableSource
    in order to support both streaming SQL and Batch SQL at the same time.
    In order to reduce unnecessary data transmission, the filter push down
    was also added to the KuduTableSource.
---
 flink-connector-kudu/README.md                     |   2 +-
 flink-connector-kudu/pom.xml                       |   2 +-
 .../connectors/kudu/connector/KuduFilterInfo.java  |  14 +-
 .../flink/connectors/kudu/table/KuduCatalog.java   |   5 +-
 .../connectors/kudu/table/KuduCatalogFactory.java  |   4 +-
 .../connectors/kudu/table/KuduTableFactory.java    |   2 +-
 .../connectors/kudu/table/KuduTableSource.java     |  96 +++++++++--
 .../kudu/table/utils/KuduTableUtils.java           | 142 ++++++++++++++++
 .../connectors/kudu/table/KuduCatalogTest.java     |   2 +-
 .../kudu/table/KuduTableFactoryTest.java           |   2 +-
 .../kudu/table/KuduTableSourceITCase.java          |  67 ++++++++
 .../connectors/kudu/table/KuduTableSourceTest.java | 181 +++++++++++++++++++++
 .../connectors/kudu/table/KuduTableTestUtils.java  |  10 +-
 13 files changed, 496 insertions(+), 33 deletions(-)

diff --git a/flink-connector-kudu/README.md b/flink-connector-kudu/README.md
index 14c13eb..6370aa6 100644
--- a/flink-connector-kudu/README.md
+++ b/flink-connector-kudu/README.md
@@ -184,7 +184,7 @@ are described as being nullable, and not being primary keys.
 
 ## DataStream API
 
-It is also possible to use the the Kudu connector directly from the DataStream API however we
+It is also possible to use the Kudu connector directly from the DataStream API however we
 encourage all users to explore the Table API as it provides a lot of useful tooling when working
 with Kudu data.
 
diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml
index fe7887c..3bbefee 100644
--- a/flink-connector-kudu/pom.xml
+++ b/flink-connector-kudu/pom.xml
@@ -52,7 +52,7 @@
 
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-table-planner-blink_2.11</artifactId>
+      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
       <scope>provided</scope>
     </dependency>
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index 0a89cad..08fa86b 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -68,25 +68,25 @@ public class KuduFilterInfo implements Serializable {
                 predicate = KuduPredicate.newComparisonPredicate(column, comparison, (String) this.value);
                 break;
             case FLOAT:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (float) this.value);
                 break;
             case INT8:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (byte) this.value);
                 break;
             case INT16:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (short) this.value);
                 break;
             case INT32:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (int) this.value);
                 break;
             case INT64:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (long) this.value);
                 break;
             case DOUBLE:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (double) this.value);
                 break;
             case BOOL:
-                predicate = KuduPredicate.newComparisonPredicate(column, comparison, this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, comparison, (boolean) this.value);
                 break;
             case UNIXTIME_MICROS:
                 Long time = (Long) this.value;
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
index 7b3c987..2ca7c0e 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java
@@ -64,7 +64,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_HASH_COLS;
-import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_MASTERS;
 import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_PRIMARY_KEY_COLS;
 import static org.apache.flink.connectors.kudu.table.KuduTableFactory.KUDU_REPLICAS;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -183,9 +182,9 @@ public class KuduCatalog extends AbstractReadOnlyCatalog {
 
     protected Map<String, String> createTableProperties(String tableName, List<ColumnSchema> primaryKeyColumns) {
         Map<String, String> props = new HashMap<>();
-        props.put(KUDU_MASTERS, kuduMasters);
+        props.put(KuduTableFactory.KUDU_MASTERS, kuduMasters);
         String primaryKeyNames = primaryKeyColumns.stream().map(ColumnSchema::getName).collect(Collectors.joining(","));
-        props.put(KUDU_PRIMARY_KEY_COLS, primaryKeyNames);
+        props.put(KuduTableFactory.KUDU_PRIMARY_KEY_COLS, primaryKeyNames);
         props.put(KuduTableFactory.KUDU_TABLE, tableName);
         return props;
     }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
index 1018cae..30aaa40 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalogFactory.java
@@ -62,14 +62,14 @@ public class KuduCatalogFactory implements CatalogFactory {
     @Override
     public Catalog createCatalog(String name, Map<String, String> properties) {
         final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
-        return new KuduCatalog(name, descriptorProperties.getString(KuduTableFactory.KUDU_MASTERS));
+        return new KuduCatalog(name,
+            descriptorProperties.getString(KuduTableFactory.KUDU_MASTERS));
     }
 
     private DescriptorProperties getValidatedProperties(Map<String, String> properties) {
         final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
         descriptorProperties.putProperties(properties);
         descriptorProperties.validateString(KuduTableFactory.KUDU_MASTERS, false);
-
         return descriptorProperties;
     }
 
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
index 49b09a2..eb72205 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java
@@ -138,7 +138,7 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto
         KuduReaderConfig.Builder configBuilder = KuduReaderConfig.Builder
                 .setMasters(masterAddresses);
 
-        return new KuduTableSource(configBuilder, tableInfo, physicalSchema, null);
+        return new KuduTableSource(configBuilder, tableInfo, physicalSchema, null, null);
     }
 
     @Override
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
index 2c972fb..db73df3 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSource.java
@@ -19,12 +19,15 @@ package org.apache.flink.connectors.kudu.table;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connectors.kudu.batch.KuduRowInputFormat;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.connector.reader.KuduReaderConfig;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.sources.FilterableTableSource;
 import org.apache.flink.table.sources.LimitableTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.table.sources.StreamTableSource;
@@ -35,30 +38,63 @@ import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
 
-public class KuduTableSource implements StreamTableSource<Row>, LimitableTableSource<Row>, ProjectableTableSource<Row> {
+import static org.apache.flink.connectors.kudu.table.utils.KuduTableUtils.toKuduFilterInfo;
+
+public class KuduTableSource implements StreamTableSource<Row>,
+    LimitableTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KuduTableSource.class);
 
     private final KuduReaderConfig.Builder configBuilder;
     private final KuduTableInfo tableInfo;
     private final TableSchema flinkSchema;
     private final String[] projectedFields;
+    // predicate expression to apply
+    @Nullable
+    private final List<KuduFilterInfo> predicates;
+    private boolean isFilterPushedDown;
 
-    public KuduTableSource(KuduReaderConfig.Builder configBuilder, KuduTableInfo tableInfo, TableSchema flinkSchema, String[] projectedFields) {
+    private KuduRowInputFormat kuduRowInputFormat;
+
+    public KuduTableSource(KuduReaderConfig.Builder configBuilder, KuduTableInfo tableInfo,
+        TableSchema flinkSchema, List<KuduFilterInfo> predicates, String[] projectedFields) {
         this.configBuilder = configBuilder;
         this.tableInfo = tableInfo;
         this.flinkSchema = flinkSchema;
+        this.predicates = predicates;
         this.projectedFields = projectedFields;
+        if (predicates != null && predicates.size() != 0) {
+            this.isFilterPushedDown = true;
+        }
+        this.kuduRowInputFormat = new KuduRowInputFormat(configBuilder.build(), tableInfo,
+            predicates == null ? Collections.emptyList() : predicates,
+            projectedFields == null ? null : Lists.newArrayList(projectedFields));
     }
 
     @Override
-    public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
-
-        KuduRowInputFormat inputFormat = new KuduRowInputFormat(configBuilder.build(), tableInfo, projectedFields == null ? null : Lists.newArrayList(projectedFields));
+    public boolean isBounded() {
+        return true;
+    }
 
-        return env.createInput(inputFormat, (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType())).name(explainSource());
+    @Override
+    public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
+        KuduRowInputFormat inputFormat = new KuduRowInputFormat(configBuilder.build(), tableInfo,
+            predicates == null ? Collections.emptyList() : predicates,
+            projectedFields == null ? null : Lists.newArrayList(projectedFields));
+        return env.createInput(inputFormat,
+            (TypeInformation<Row>) TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType()))
+            .name(explainSource());
     }
 
     @Override
@@ -67,6 +103,11 @@ public class KuduTableSource implements StreamTableSource<Row>, LimitableTableSo
     }
 
     @Override
+    public boolean isFilterPushedDown() {
+        return this.isFilterPushedDown;
+    }
+
+    @Override
     public DataType getProducedDataType() {
         if (projectedFields == null) {
             return flinkSchema.toRowDataType();
@@ -87,18 +128,14 @@ public class KuduTableSource implements StreamTableSource<Row>, LimitableTableSo
     }
 
     @Override
-    public boolean isBounded() {
-        return true;
-    }
-
-    @Override
     public boolean isLimitPushedDown() {
         return true;
     }
 
     @Override
     public TableSource<Row> applyLimit(long l) {
-        return new KuduTableSource(configBuilder.setRowLimit((int) l), tableInfo, flinkSchema, projectedFields);
+        return new KuduTableSource(configBuilder.setRowLimit((int) l), tableInfo, flinkSchema,
+            predicates, projectedFields);
     }
 
     @Override
@@ -109,12 +146,41 @@ public class KuduTableSource implements StreamTableSource<Row>, LimitableTableSo
         for (int i = 0; i < ints.length; i++) {
             fieldNames[i] = prevFieldNames.get(ints[i]);
         }
-        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, fieldNames);
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, predicates, fieldNames);
+    }
+
+    @Override
+    public TableSource<Row> applyPredicate(List<Expression> predicates) {
+        List<KuduFilterInfo> kuduPredicates = new ArrayList<>();
+        ListIterator<Expression> predicatesIter = predicates.listIterator();
+        while(predicatesIter.hasNext()) {
+            Expression predicate = predicatesIter.next();
+            Optional<KuduFilterInfo> kuduPred = toKuduFilterInfo(predicate);
+            if (kuduPred != null && kuduPred.isPresent()) {
+                LOG.debug("Predicate [{}] converted into KuduFilterInfo and pushed into " +
+                    "KuduTable [{}].", predicate, tableInfo.getName());
+                kuduPredicates.add(kuduPred.get());
+                predicatesIter.remove();
+            } else {
+                LOG.debug("Predicate [{}] could not be pushed into KuduFilterInfo for KuduTable [{}].",
+                    predicate, tableInfo.getName());
+            }
+        }
+        return new KuduTableSource(configBuilder, tableInfo, flinkSchema, kuduPredicates, projectedFields);
     }
 
     @Override
     public String explainSource() {
-        return "KuduStreamTableSource[schema=" + Arrays.toString(getTableSchema().getFieldNames())
-                + (projectedFields != null ?", projectFields=" + Arrays.toString(projectedFields) + "]" : "]");
+        return "KuduTableSource[schema=" + Arrays.toString(getTableSchema().getFieldNames()) +
+            ", filter=" + predicateString() +
+            (projectedFields != null ?", projectFields=" + Arrays.toString(projectedFields) + "]" : "]");
+    }
+
+    private String predicateString() {
+        if (predicates == null || predicates.size() == 0) {
+            return "No predicates push down";
+        } else {
+            return "AND(" + predicates + ")";
+        }
     }
 }
diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
index 5fc8cca..53f205d 100644
--- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
+++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java
@@ -20,9 +20,16 @@ package org.apache.flink.connectors.kudu.table.utils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connectors.kudu.connector.ColumnSchemasFactory;
 import org.apache.flink.connectors.kudu.connector.CreateTableOptionsFactory;
+import org.apache.flink.connectors.kudu.connector.KuduFilterInfo;
 import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
 import org.apache.flink.connectors.kudu.table.KuduTableFactory;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.TimestampType;
@@ -37,7 +44,9 @@ import org.apache.kudu.client.CreateTableOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -133,4 +142,137 @@ public class KuduTableUtils {
                 });
         return builder.build();
     }
+
+    /**
+     * Converts Flink Expression to KuduFilterInfo.
+     */
+    @Nullable
+    public static Optional<KuduFilterInfo> toKuduFilterInfo(Expression predicate) {
+        LOG.debug("predicate summary: [{}], class: [{}], children: [{}]",
+            predicate.asSummaryString(), predicate.getClass(), predicate.getChildren());
+        if (predicate instanceof CallExpression) {
+            CallExpression callExpression = (CallExpression) predicate;
+            FunctionDefinition functionDefinition = callExpression.getFunctionDefinition();
+            List<Expression> children = callExpression.getChildren();
+            if (children.size() == 1) {
+                return convertUnaryIsNullExpression(functionDefinition, children);
+            } else if (children.size() == 2 &&
+                !functionDefinition.equals(BuiltInFunctionDefinitions.OR)) {
+                return convertBinaryComparison(functionDefinition, children);
+            } else if (children.size() > 0 && functionDefinition.equals(BuiltInFunctionDefinitions.OR)) {
+                return convertIsInExpression(children);
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static boolean isFieldReferenceExpression(Expression exp) {
+        return exp instanceof FieldReferenceExpression;
+    }
+
+    private static boolean isValueLiteralExpression(Expression exp) {
+        return exp instanceof ValueLiteralExpression;
+    }
+
+    private static Optional<KuduFilterInfo> convertUnaryIsNullExpression(
+        FunctionDefinition functionDefinition, List<Expression> children) {
+        FieldReferenceExpression fieldReferenceExpression;
+        if (isFieldReferenceExpression(children.get(0))) {
+            fieldReferenceExpression = (FieldReferenceExpression) children.get(0);
+        } else {
+            return Optional.empty();
+        }
+        // IS_NULL IS_NOT_NULL
+        String columnName = fieldReferenceExpression.getName();
+        KuduFilterInfo.Builder builder = KuduFilterInfo.Builder.create(columnName);
+        if (functionDefinition.equals(BuiltInFunctionDefinitions.IS_NULL)) {
+            return Optional.of(builder.isNull().build());
+        } else if (functionDefinition.equals(BuiltInFunctionDefinitions.IS_NOT_NULL)) {
+            return Optional.of(builder.isNotNull().build());
+        }
+        return Optional.empty();
+    }
+
+    private static Optional<KuduFilterInfo> convertBinaryComparison(
+        FunctionDefinition functionDefinition, List<Expression> children) {
+        FieldReferenceExpression fieldReferenceExpression;
+        ValueLiteralExpression valueLiteralExpression;
+        if (isFieldReferenceExpression(children.get(0)) &&
+            isValueLiteralExpression(children.get(1))) {
+            fieldReferenceExpression = (FieldReferenceExpression) children.get(0);
+            valueLiteralExpression = (ValueLiteralExpression) children.get(1);
+        } else if (isValueLiteralExpression(children.get(0)) &&
+            isFieldReferenceExpression(children.get(1))) {
+            fieldReferenceExpression = (FieldReferenceExpression) children.get(1);
+            valueLiteralExpression = (ValueLiteralExpression) children.get(0);
+        } else {
+            return Optional.empty();
+        }
+        String columnName = fieldReferenceExpression.getName();
+        Object value = extractValueLiteral(fieldReferenceExpression, valueLiteralExpression);
+        if (value == null) {
+            return Optional.empty();
+        }
+        KuduFilterInfo.Builder builder = KuduFilterInfo.Builder.create(columnName);
+        // GREATER GREATER_EQUAL EQUAL LESS LESS_EQUAL
+        if (functionDefinition.equals(BuiltInFunctionDefinitions.GREATER_THAN)) {
+            return Optional.of(builder.greaterThan(value).build());
+        } else if (functionDefinition.equals(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL)) {
+            return Optional.of(builder.greaterOrEqualTo(value).build());
+        } else if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS)) {
+            return Optional.of(builder.equalTo(value).build());
+        } else if (functionDefinition.equals(BuiltInFunctionDefinitions.LESS_THAN)) {
+            return Optional.of(builder.lessThan(value).build());
+        } else if (functionDefinition.equals(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL)) {
+            return Optional.of(builder.lessOrEqualTo(value).build());
+        }
+        return Optional.empty();
+    }
+
+    private static Optional<KuduFilterInfo> convertIsInExpression(List<Expression> children) {
+        // IN operation will be: or(equals(field, value1), equals(field, value2), ...) in blink
+        // For FilterType IS_IN, all internal CallExpression's function need to be equals and
+        // fields need to be same
+        List<Object> values = new ArrayList<>(children.size());
+        String columnName = "";
+        for (int i = 0; i < children.size(); i++) {
+            if (children.get(i) instanceof CallExpression) {
+                CallExpression callExpression = (CallExpression) children.get(i);
+                FunctionDefinition functionDefinition = callExpression.getFunctionDefinition();
+                List<Expression> subChildren = callExpression.getChildren();
+                FieldReferenceExpression fieldReferenceExpression;
+                ValueLiteralExpression valueLiteralExpression;
+                if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS) &&
+                    subChildren.size() == 2 && isFieldReferenceExpression(subChildren.get(0)) &&
+                    isValueLiteralExpression(subChildren.get(1))) {
+                    fieldReferenceExpression = (FieldReferenceExpression) subChildren.get(0);
+                    valueLiteralExpression = (ValueLiteralExpression) subChildren.get(1);
+                    String fieldName = fieldReferenceExpression.getName();
+                    if (i != 0 && !columnName.equals(fieldName)) {
+                        return Optional.empty();
+                    } else {
+                        columnName = fieldName;
+                    }
+                    Object value = extractValueLiteral(fieldReferenceExpression,
+                        valueLiteralExpression);
+                    if (value == null) {
+                        return Optional.empty();
+                    }
+                    values.add(i, value);
+                } else {
+                   return Optional.empty();
+                }
+            } else {
+                return Optional.empty();
+            }
+        }
+        KuduFilterInfo.Builder builder = KuduFilterInfo.Builder.create(columnName);
+        return Optional.of(builder.isIn(values).build());
+    }
+
+    private static Object extractValueLiteral(FieldReferenceExpression fieldReferenceExpression,
+        ValueLiteralExpression valueLiteralExpression) {
+        DataType fieldType = fieldReferenceExpression.getOutputDataType();
+        return valueLiteralExpression.getValueAs(fieldType.getConversionClass()).orElse(null);
+    }
 }
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
index f694108..3cef2d8 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java
@@ -65,7 +65,7 @@ public class KuduCatalogTest extends KuduTestBase {
     public void init() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         catalog = new KuduCatalog(harness.getMasterAddressesAsString());
-        tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode(env);
+        tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env);
         tableEnv.registerCatalog("kudu", catalog);
         tableEnv.useCatalog("kudu");
     }
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
index 398ba9f..3a2f776 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java
@@ -42,7 +42,7 @@ public class KuduTableFactoryTest extends KuduTestBase {
     @BeforeEach
     public void init() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
-        tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode(env);
+        tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerStreamingMode(env);
         kuduMasters = harness.getMasterAddressesAsString();
     }
 
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
new file mode 100644
index 0000000..f5939fc
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableUtils;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Integration tests for {@link KuduTableSource}.
+ */
+public class KuduTableSourceITCase extends KuduTestBase {
+    private TableEnvironment tableEnv;
+    private KuduCatalog catalog;
+
+    @BeforeEach
+    public void init() {
+        KuduTableInfo tableInfo = booksTableInfo("books", true);
+        setUpDatabase(tableInfo);
+        tableEnv = KuduTableTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+        catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        tableEnv.registerCatalog("kudu", catalog);
+        tableEnv.useCatalog("kudu");
+    }
+
+    @Test
+    void testFullBatchScan() throws Exception {
+        Table query = tableEnv.sqlQuery("select * from books order by id");
+        List<Row> results = TableUtils.collectToList(query);
+        assertEquals(5, results.size());
+        assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11", results.get(0).toString());
+        tableEnv.sqlUpdate("DROP TABLE books");
+    }
+
+    @Test
+    void testScanWithProjectionAndFilter() throws Exception {
+        // (price > 30 and price < 40)
+        Table table = tableEnv.sqlQuery("SELECT title FROM books WHERE id IN (1003, 1004) and quantity < 40");
+        List<Row> results = TableUtils.collectToList(table);
+        assertEquals(1, results.size());
+        assertEquals("More Java for more dummies", results.get(0).toString());
+        tableEnv.sqlUpdate("DROP TABLE books");
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
new file mode 100644
index 0000000..f4bb6ae
--- /dev/null
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.table;
+
+import org.apache.flink.connectors.kudu.connector.KuduTableInfo;
+import org.apache.flink.connectors.kudu.connector.KuduTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.FieldReferenceExpression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Collections.singletonList;
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
+import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.EQUALS;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit Tests for {@link KuduTableSource}.
+ */
+public class KuduTableSourceTest extends KuduTestBase {
+    private KuduTableSource kuduTableSource;
+    private KuduCatalog catalog;
+
+    private static final ScalarFunction DUMMY_FUNCTION = new ScalarFunction() {
+        // dummy
+    };
+
+    @BeforeEach
+    public void init() {
+        KuduTableInfo tableInfo = booksTableInfo("books", true);
+        setUpDatabase(tableInfo);
+        catalog = new KuduCatalog(harness.getMasterAddressesAsString());
+        ObjectPath op = new ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "books");
+        try {
+            kuduTableSource = catalog.getKuduTableFactory().createTableSource(op, catalog.getTable(op));
+        } catch (TableNotExistException e) {
+            fail(e.getMessage());
+        }
+    }
+
+    @AfterEach
+    public void clean() {
+        KuduTableInfo tableInfo = booksTableInfo("books", true);
+        cleanDatabase(tableInfo);
+    }
+
+    @Test
+    void testGetTableSchema() throws Exception {
+        TableSchema tableSchema = kuduTableSource.getTableSchema();
+        assertNotNull(tableSchema);
+        assertArrayEquals(getFieldNames(), tableSchema.getFieldNames());
+        assertArrayEquals(getFieldDataTypes(), tableSchema.getFieldDataTypes());
+
+    }
+
+    @Test
+    void testGetProducedDataType() throws Exception {
+        DataType producedDataType = kuduTableSource.getProducedDataType();
+        assertNotNull(producedDataType);
+        assertEquals(getReturnDataType(getFieldNames(), getFieldDataTypes()), producedDataType);
+    }
+
+    @Test
+    void testProjectFields() throws Exception {
+        KuduTableSource projectedTableSource = (KuduTableSource) kuduTableSource.projectFields(
+            new int[]{3, 4, 1});
+        // ensure copy is returned
+        assertTrue(kuduTableSource != projectedTableSource);
+        // ensure table schema is identical
+        assertEquals(kuduTableSource.getTableSchema(), projectedTableSource.getTableSchema());
+        // ensure IF is configured with selected fields
+        String[] fieldNames = getFieldNames();
+        DataType[] fieldDataTypes = getFieldDataTypes();
+        String[] projectedFieldNames = new String[] {fieldNames[3], fieldNames[4], fieldNames[1]};
+        DataType[] projectedDataTypes = new DataType[] {fieldDataTypes[3], fieldDataTypes[4],
+            fieldDataTypes[1]};
+        assertEquals(getReturnDataType(projectedFieldNames, projectedDataTypes),
+            projectedTableSource.getProducedDataType());
+    }
+
+    @Test
+    void testApplyPredicate() throws Exception {
+        // expressions for supported predicates
+        FieldReferenceExpression fieldReferenceExpression = new FieldReferenceExpression(
+            "id", DataTypes.INT(), 0, 0);
+        ValueLiteralExpression valueLiteralExpression = new ValueLiteralExpression(
+            1, DataTypes.INT());
+        List<ResolvedExpression> args = new ArrayList<>(
+            Arrays.asList(fieldReferenceExpression, valueLiteralExpression));
+        Expression supportedPred = new CallExpression(
+            EQUALS,
+            args,
+            DataTypes.BOOLEAN());
+        // unsupported predicate
+        Expression unsupportedPred = new CallExpression(
+            new ScalarFunctionDefinition("dummy", DUMMY_FUNCTION),
+            singletonList(new ValueLiteralExpression(1, DataTypes.INT())),
+            DataTypes.INT());
+        // invalid predicate
+        Expression invalidPred = new CallExpression(
+            AND,
+            Collections.emptyList(),
+            DataTypes.ARRAY(DataTypes.INT()));
+
+        ArrayList<Expression> preds = new ArrayList<>(
+            Arrays.asList(supportedPred, unsupportedPred, invalidPred));
+        // apply predicates on TableSource
+        KuduTableSource filteredTableSource = (KuduTableSource) kuduTableSource.applyPredicate(preds);
+        // ensure the unable push down expressions are reserved
+        assertEquals(preds.size(), 2);
+        assertSame(unsupportedPred, preds.get(0));
+        assertSame(invalidPred, preds.get(1));
+        // ensure copy is returned
+        assertNotSame(kuduTableSource, filteredTableSource);
+        // ensure table schema is identical
+        assertEquals(kuduTableSource.getTableSchema(), filteredTableSource.getTableSchema());
+        // ensure return type is identical
+        assertEquals(kuduTableSource.getProducedDataType(), filteredTableSource.getProducedDataType());
+        // ensure filter pushdown is correct
+        assertTrue(filteredTableSource.isFilterPushedDown());
+        assertFalse(kuduTableSource.isFilterPushedDown());
+    }
+
+    private String[] getFieldNames() {
+        return new String[] {
+            "id", "title", "author", "price", "quantity"
+        };
+    }
+
+    private DataType[] getFieldDataTypes() {
+        return new DataType[]{
+            DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DOUBLE(), DataTypes.INT(),
+        };
+    }
+
+    private DataType getReturnDataType(String[] fieldNames, DataType[] dataTypes) {
+        DataTypes.Field[] fields = new DataTypes.Field[fieldNames.length];
+        for (int i = 0; i < fieldNames.length; i++) {
+            fields[i] = DataTypes.FIELD(fieldNames[i], dataTypes[i]);
+        }
+        return DataTypes.ROW(fields);
+    }
+}
diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
index b522c51..54854fb 100644
--- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
+++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java
@@ -18,16 +18,24 @@ package org.apache.flink.connectors.kudu.table;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
 
 public class KuduTableTestUtils {
 
-    public static StreamTableEnvironment createTableEnvWithBlinkPlannerBatchMode(StreamExecutionEnvironment env) {
+    public static StreamTableEnvironment createTableEnvWithBlinkPlannerStreamingMode(StreamExecutionEnvironment env) {
         EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
         tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
         return tableEnv;
     }
+
+    public static TableEnvironment createTableEnvWithBlinkPlannerBatchMode() {
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
+        TableEnvironment tableEnv = TableEnvironment.create(settings);
+        tableEnv.getConfig().getConfiguration().setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+        return tableEnv;
+    }
 }