You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/06 08:21:04 UTC

[flink-table-store] branch master updated: [FLINK-29154] Support LookupTableSource for table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e55dde87 [FLINK-29154] Support LookupTableSource for table store
e55dde87 is described below

commit e55dde87296b51132d37fb4265bd3c863f05db40
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Sep 6 16:20:58 2022 +0800

    [FLINK-29154] Support LookupTableSource for table store
    
    This closes #281
---
 docs/content/docs/development/configuration.md     |   2 +-
 docs/content/docs/development/lookup-join.md       |  93 +++++
 docs/content/docs/development/rescale-bucket.md    |   2 +-
 docs/content/docs/development/roadmap.md           |   2 +-
 .../generated/rocksdb_configuration.html           | 114 ++++++
 .../store/file/predicate/PredicateBuilder.java     |   6 +
 .../store/file/predicate/PredicateConverter.java   |  21 +-
 .../store/file/predicate/PredicateFilter.java      |  42 ++
 .../table/store/utils/KeyProjectedRowData.java     | 163 ++++++++
 flink-table-store-connector/pom.xml                |   6 +
 .../table/store/connector/RocksDBOptions.java      | 349 +++++++++++++++++
 .../connector/lookup/FileStoreLookupFunction.java  | 216 +++++++++++
 .../table/store/connector/lookup/LookupTable.java  |  53 +++
 .../connector/lookup/PrimaryKeyLookupTable.java    |  88 +++++
 .../store/connector/lookup/RocksDBSetState.java    | 124 ++++++
 .../table/store/connector/lookup/RocksDBState.java | 130 +++++++
 .../connector/lookup/RocksDBStateFactory.java      | 100 +++++
 .../store/connector/lookup/RocksDBValueState.java  | 102 +++++
 .../lookup/SecondaryIndexLookupTable.java          |  97 +++++
 .../source/ContinuousFileSplitEnumerator.java      |  76 +---
 .../store/connector/source/FileStoreSource.java    |   2 +-
 .../store/connector/source/TableStoreSource.java   |  21 +
 .../table/store/connector/LookupJoinITCase.java    | 425 +++++++++++++++++++++
 .../store/connector/lookup/LookupTableTest.java    | 170 +++++++++
 .../store/table/source/SnapshotEnumerator.java     |  97 +++++
 .../store/table/source/TableStreamingReader.java   | 123 ++++++
 .../file/predicate/PredicateConverterTest.java     |  14 +-
 27 files changed, 2553 insertions(+), 85 deletions(-)

diff --git a/docs/content/docs/development/configuration.md b/docs/content/docs/development/configuration.md
index 5d15dc35..2b6ed92a 100644
--- a/docs/content/docs/development/configuration.md
+++ b/docs/content/docs/development/configuration.md
@@ -1,6 +1,6 @@
 ---
 title: "Configuration"
-weight: 7
+weight: 8
 type: docs
 aliases:
 - /development/configuration.html
diff --git a/docs/content/docs/development/lookup-join.md b/docs/content/docs/development/lookup-join.md
new file mode 100644
index 00000000..85ed8a3b
--- /dev/null
+++ b/docs/content/docs/development/lookup-join.md
@@ -0,0 +1,93 @@
+---
+title: "Lookup Join"
+weight: 6
+type: docs
+aliases:
+- /development/lookup-join.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Lookup Join
+
+A [Lookup Join](https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/queries/joins/)
+is used to enrich a table with data that is queried from Flink Table Store. The join requires one table to have
+a processing time attribute and the other table to be backed by a lookup source connector.
+
+First, create a table, and update it in real-time.
+
+```sql
+-- Create a table store catalog
+CREATE CATALOG my_catalog WITH (
+  'type'='table-store',
+  'warehouse'='hdfs://nn:8020/warehouse/path' -- or 'file://tmp/foo/bar'
+);
+
+USE CATALOG my_catalog;
+
+-- Create a table in table-store catalog
+CREATE TABLE customers (
+  id INT PRIMARY KEY NOT ENFORCED,
+  name STRING,
+  country STRING,
+  zip STRING
+);
+
+-- Launch a streaming job to update customers table
+INSERT INTO customers ...
+```
+
+Then, you can use this table in lookup join.
+
+```sql
+-- enrich each order with customer information
+SELECT o.order_id, o.total, c.country, c.zip
+FROM Orders AS o
+         JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
+              ON o.customer_id = c.id;
+```
+
+The lookup join node will maintain the rocksdb cache locally and pull the latest updates
+of the table in real time, and only pull the necessary data. Therefore, your filter conditions
+are also very important.
+
+In order to avoid excessive use of local disks, the lookup join feature is only suitable
+for table sizes below tens of millions.
+
+{{< hint info >}}
+__Note:__ Partitioned or non-pk tables are not supported now.
+{{< /hint >}}
+
+Project pushdown can effectively reduce the overhead,
+[FLINK-29138](https://issues.apache.org/jira/browse/FLINK-29138) fixed the bug that
+the project cannot be pushed down to the source. So it is preferable to use a version
+greater than or equal to `flink 1.14.6`, `flink 1.15.3`. Or you can cherry-pick the
+commit to your own Flink branch.
+
+## RocksDBOptions
+
+Options for rocksdb cache, you can configure options in `WITH` or dynamic table hints.
+
+```sql
+SELECT o.order_id, o.total, c.country, c.zip
+  FROM Orders AS o JOIN customers /*+ OPTIONS('lookup.cache-rows'='20000') */
+  FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
+```
+
+{{< generated/rocksdb_configuration >}}
diff --git a/docs/content/docs/development/rescale-bucket.md b/docs/content/docs/development/rescale-bucket.md
index c937256a..6e688e9f 100644
--- a/docs/content/docs/development/rescale-bucket.md
+++ b/docs/content/docs/development/rescale-bucket.md
@@ -1,6 +1,6 @@
 ---
 title: "Rescale Bucket"
-weight: 6
+weight: 7
 type: docs
 aliases:
 - /development/rescale-bucket.html
diff --git a/docs/content/docs/development/roadmap.md b/docs/content/docs/development/roadmap.md
index f88ce6bf..9c335948 100644
--- a/docs/content/docs/development/roadmap.md
+++ b/docs/content/docs/development/roadmap.md
@@ -1,6 +1,6 @@
 ---
 title: "Roadmap"
-weight: 8
+weight: 9
 type: docs
 aliases:
 - /development/roadmap.html
diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
new file mode 100644
index 00000000..d6d409db
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
@@ -0,0 +1,114 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>lookup.cache-rows</h5></td>
+            <td style="word-wrap: break-word;">10000</td>
+            <td>Long</td>
+            <td>The maximum number of rows to store in the cache</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.block.blocksize</h5></td>
+            <td style="word-wrap: break-word;">4 kb</td>
+            <td>MemorySize</td>
+            <td>The approximate size (in bytes) of user data packed per block. The default blocksize is '4KB'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.block.cache-size</h5></td>
+            <td style="word-wrap: break-word;">8 mb</td>
+            <td>MemorySize</td>
+            <td>The amount of the cache for data blocks in RocksDB. The default block-cache size is '8MB'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.block.metadata-blocksize</h5></td>
+            <td style="word-wrap: break-word;">4 kb</td>
+            <td>MemorySize</td>
+            <td>Approximate size of partitioned metadata packed per block. Currently applied to indexes block when partitioned index/filters option is enabled. The default blocksize is '4KB'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.bloom-filter.bits-per-key</h5></td>
+            <td style="word-wrap: break-word;">10.0</td>
+            <td>Double</td>
+            <td>Bits per key that bloom filter will use, this only take effect when bloom filter is used. The default value is 10.0.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.bloom-filter.block-based-mode</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. The default value is 'false'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.compaction.level.max-size-level-base</h5></td>
+            <td style="word-wrap: break-word;">256 mb</td>
+            <td>MemorySize</td>
+            <td>The upper-bound of the total size of level base files in bytes. The default value is '256MB'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.compaction.level.target-file-size-base</h5></td>
+            <td style="word-wrap: break-word;">64 mb</td>
+            <td>MemorySize</td>
+            <td>The target file size for compaction, which determines a level-1 file size. The default value is '64MB'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.compaction.level.use-dynamic-size</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, RocksDB will pick target size of each level dynamically. From an empty DB, RocksDB would make last level the base level, which means merging L0 data into the last level, until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. The default value is 'false'. For more information, please refer to <a href="https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true">RocksDB's [...]
+        </tr>
+        <tr>
+            <td><h5>rocksdb.compaction.style</h5></td>
+            <td style="word-wrap: break-word;">LEVEL</td>
+            <td><p>Enum</p></td>
+            <td>The specified compaction style for DB. Candidate compaction style is LEVEL, FIFO, UNIVERSAL or NONE, and Flink chooses 'LEVEL' as default style.<br /><br />Possible values:<ul><li>"LEVEL"</li><li>"UNIVERSAL"</li><li>"FIFO"</li><li>"NONE"</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.compression.type</h5></td>
+            <td style="word-wrap: break-word;">LZ4_COMPRESSION</td>
+            <td><p>Enum</p></td>
+            <td>The compression type.<br /><br />Possible values:<ul><li>"NO_COMPRESSION"</li><li>"SNAPPY_COMPRESSION"</li><li>"ZLIB_COMPRESSION"</li><li>"BZLIB2_COMPRESSION"</li><li>"LZ4_COMPRESSION"</li><li>"LZ4HC_COMPRESSION"</li><li>"XPRESS_COMPRESSION"</li><li>"ZSTD_COMPRESSION"</li><li>"DISABLE_COMPRESSION_OPTION"</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.files.open</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>Integer</td>
+            <td>The maximum number of open files (per stateful operator) that can be used by the DB, '-1' means no limit. The default value is '-1'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.thread.num</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Integer</td>
+            <td>The maximum number of concurrent background flush and compaction jobs (per stateful operator). The default value is '2'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.use-bloom-filter</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, every newly created SST file will contain a Bloom filter. It is disabled by default.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.writebuffer.count</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Integer</td>
+            <td>The maximum number of write buffers that are built up in memory. The default value is '2'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.writebuffer.number-to-merge</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>The minimum number of write buffers that will be merged together before writing to storage. The default value is '1'.</td>
+        </tr>
+        <tr>
+            <td><h5>rocksdb.writebuffer.size</h5></td>
+            <td style="word-wrap: break-word;">64 mb</td>
+            <td>MemorySize</td>
+            <td>The amount of data built up in memory (backed by an unsorted log on disk) before converting to a sorted on-disk files. The default writebuffer size is '64MB'.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index a1b86a2d..cdaff153 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -49,9 +49,15 @@ import static java.util.Collections.singletonList;
 public class PredicateBuilder {
 
     private final RowType rowType;
+    private final List<String> fieldNames;
 
     public PredicateBuilder(RowType rowType) {
         this.rowType = rowType;
+        this.fieldNames = rowType.getFieldNames();
+    }
+
+    public int indexOf(String field) {
+        return fieldNames.indexOf(field);
     }
 
     public Predicate equal(int idx, Object literal) {
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
index baaf768d..9cdfcaf6 100644
--- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateConverter.java
@@ -47,7 +47,12 @@ import java.util.regex.Pattern;
 import static org.apache.flink.table.data.conversion.DataStructureConverters.getConverter;
 import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
 
-/** Convert {@link Expression} to {@link Predicate}. */
+/**
+ * Convert {@link Expression} to {@link Predicate}.
+ *
+ * <p>For {@link FieldReferenceExpression}, please use name instead of index, if the project
+ * pushdown is before and the filter pushdown is after, the index of the filter will be projected.
+ */
 public class PredicateConverter implements ExpressionVisitor<Predicate> {
 
     private final PredicateBuilder builder;
@@ -91,15 +96,17 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
             for (int i = 1; i < children.size(); i++) {
                 literals.add(extractLiteral(fieldRefExpr.getOutputDataType(), children.get(i)));
             }
-            return builder.in(fieldRefExpr.getInputIndex(), literals);
+            return builder.in(builder.indexOf(fieldRefExpr.getName()), literals);
         } else if (func == BuiltInFunctionDefinitions.IS_NULL) {
             return extractFieldReference(children.get(0))
-                    .map(FieldReferenceExpression::getFieldIndex)
+                    .map(FieldReferenceExpression::getName)
+                    .map(builder::indexOf)
                     .map(builder::isNull)
                     .orElseThrow(UnsupportedExpression::new);
         } else if (func == BuiltInFunctionDefinitions.IS_NOT_NULL) {
             return extractFieldReference(children.get(0))
-                    .map(FieldReferenceExpression::getFieldIndex)
+                    .map(FieldReferenceExpression::getName)
+                    .map(builder::indexOf)
                     .map(builder::isNotNull)
                     .orElseThrow(UnsupportedExpression::new);
         } else if (func == BuiltInFunctionDefinitions.LIKE) {
@@ -164,7 +171,7 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
                     Matcher beginMatcher = BEGIN_PATTERN.matcher(escapedSqlPattern);
                     if (beginMatcher.matches()) {
                         return builder.startsWith(
-                                fieldRefExpr.getFieldIndex(),
+                                builder.indexOf(fieldRefExpr.getName()),
                                 BinaryStringData.fromString(beginMatcher.group(1)));
                     }
                 }
@@ -184,13 +191,13 @@ public class PredicateConverter implements ExpressionVisitor<Predicate> {
         if (fieldRefExpr.isPresent()) {
             Object literal =
                     extractLiteral(fieldRefExpr.get().getOutputDataType(), children.get(1));
-            return visit1.apply(fieldRefExpr.get().getFieldIndex(), literal);
+            return visit1.apply(builder.indexOf(fieldRefExpr.get().getName()), literal);
         } else {
             fieldRefExpr = extractFieldReference(children.get(1));
             if (fieldRefExpr.isPresent()) {
                 Object literal =
                         extractLiteral(fieldRefExpr.get().getOutputDataType(), children.get(0));
-                return visit2.apply(fieldRefExpr.get().getFieldIndex(), literal);
+                return visit2.apply(builder.indexOf(fieldRefExpr.get().getName()), literal);
             }
         }
 
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java
new file mode 100644
index 00000000..2c4f0528
--- /dev/null
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/file/predicate/PredicateFilter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+/** A {@link java.util.function.Predicate} to filter {@link RowData}. */
+public class PredicateFilter implements java.util.function.Predicate<RowData> {
+
+    private final RowDataToObjectArrayConverter arrayConverter;
+    @Nullable private final Predicate predicate;
+
+    public PredicateFilter(RowType rowType, @Nullable Predicate predicate) {
+        this.arrayConverter = new RowDataToObjectArrayConverter(rowType);
+        this.predicate = predicate;
+    }
+
+    @Override
+    public boolean test(RowData rowData) {
+        return predicate == null || predicate.test(arrayConverter.convert(rowData));
+    }
+}
diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/KeyProjectedRowData.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/KeyProjectedRowData.java
new file mode 100644
index 00000000..c3139a81
--- /dev/null
+++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/KeyProjectedRowData.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+
+/** A {@link RowData} to project key fields with {@link RowKind#INSERT}. */
+public class KeyProjectedRowData implements RowData {
+
+    private final int[] indexMapping;
+
+    private RowData row;
+
+    public KeyProjectedRowData(int[] indexMapping) {
+        this.indexMapping = indexMapping;
+    }
+
+    public KeyProjectedRowData replaceRow(RowData row) {
+        this.row = row;
+        return this;
+    }
+
+    @Override
+    public int getArity() {
+        return indexMapping.length;
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return RowKind.INSERT;
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        throw new UnsupportedOperationException("Key row data should always be insert only.");
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return row.isNullAt(indexMapping[pos]);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return row.getBoolean(indexMapping[pos]);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return row.getByte(indexMapping[pos]);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return row.getShort(indexMapping[pos]);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return row.getInt(indexMapping[pos]);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return row.getLong(indexMapping[pos]);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return row.getFloat(indexMapping[pos]);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return row.getDouble(indexMapping[pos]);
+    }
+
+    @Override
+    public StringData getString(int pos) {
+        return row.getString(indexMapping[pos]);
+    }
+
+    @Override
+    public DecimalData getDecimal(int pos, int precision, int scale) {
+        return row.getDecimal(indexMapping[pos], precision, scale);
+    }
+
+    @Override
+    public TimestampData getTimestamp(int pos, int precision) {
+        return row.getTimestamp(indexMapping[pos], precision);
+    }
+
+    @Override
+    public <T> RawValueData<T> getRawValue(int pos) {
+        return row.getRawValue(indexMapping[pos]);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return row.getBinary(indexMapping[pos]);
+    }
+
+    @Override
+    public ArrayData getArray(int pos) {
+        return row.getArray(indexMapping[pos]);
+    }
+
+    @Override
+    public MapData getMap(int pos) {
+        return row.getMap(indexMapping[pos]);
+    }
+
+    @Override
+    public RowData getRow(int pos, int numFields) {
+        return row.getRow(indexMapping[pos], numFields);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        throw new UnsupportedOperationException("Projected row data cannot be compared");
+    }
+
+    @Override
+    public int hashCode() {
+        throw new UnsupportedOperationException("Projected row data cannot be hashed");
+    }
+
+    @Override
+    public String toString() {
+        return getRowKind().shortString()
+                + "{"
+                + "indexMapping="
+                + Arrays.toString(indexMapping)
+                + ", mutableRow="
+                + row
+                + '}';
+    }
+}
diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml
index 45db1b95..aab108e4 100644
--- a/flink-table-store-connector/pom.xml
+++ b/flink-table-store-connector/pom.xml
@@ -68,6 +68,12 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>frocksdbjni</artifactId>
+            <version>6.20.3-ververica-1.0</version>
+        </dependency>
+
         <!-- test dependencies -->
 
         <dependency>
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/RocksDBOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/RocksDBOptions.java
new file mode 100644
index 00000000..2889abd1
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/RocksDBOptions.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.PlainTableConfig;
+import org.rocksdb.TableFormatConfig;
+
+import java.io.File;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.LinkElement.link;
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.rocksdb.CompactionStyle.FIFO;
+import static org.rocksdb.CompactionStyle.LEVEL;
+import static org.rocksdb.CompactionStyle.NONE;
+import static org.rocksdb.CompactionStyle.UNIVERSAL;
+import static org.rocksdb.CompressionType.LZ4_COMPRESSION;
+import static org.rocksdb.InfoLogLevel.INFO_LEVEL;
+
+/** Options for rocksdb. Copied from flink {@code RocksDBConfigurableOptions}. */
+public class RocksDBOptions {
+
+    public static final ConfigOption<Long> LOOKUP_CACHE_ROWS =
+            key("lookup.cache-rows")
+                    .longType()
+                    .defaultValue(10_000L)
+                    .withDescription("The maximum number of rows to store in the cache");
+
+    // --------------------------------------------------------------------------
+    // Provided configurable DBOptions within Flink
+    // --------------------------------------------------------------------------
+
+    public static final ConfigOption<Integer> MAX_BACKGROUND_THREADS =
+            key("rocksdb.thread.num")
+                    .intType()
+                    .defaultValue(2)
+                    .withDescription(
+                            "The maximum number of concurrent background flush and compaction jobs (per stateful operator). "
+                                    + "The default value is '2'.");
+
+    public static final ConfigOption<Integer> MAX_OPEN_FILES =
+            key("rocksdb.files.open")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "The maximum number of open files (per stateful operator) that can be used by the DB, '-1' means no limit. "
+                                    + "The default value is '-1'.");
+
+    @Documentation.ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<MemorySize> LOG_MAX_FILE_SIZE =
+            key("rocksdb.log.max-file-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("25mb"))
+                    .withDescription(
+                            "The maximum size of RocksDB's file used for information logging. "
+                                    + "If the log files becomes larger than this, a new file will be created. "
+                                    + "If 0, all logs will be written to one log file. "
+                                    + "The default maximum file size is '25MB'. ");
+
+    @Documentation.ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<Integer> LOG_FILE_NUM =
+            key("rocksdb.log.file-num")
+                    .intType()
+                    .defaultValue(4)
+                    .withDescription(
+                            "The maximum number of files RocksDB should keep for information logging (Default setting: 4).");
+
+    @Documentation.ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<String> LOG_DIR =
+            key("rocksdb.log.dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The directory for RocksDB's information logging files. "
+                                    + "If empty (Flink default setting), log files will be in the same directory as the Flink log. "
+                                    + "If non-empty, this directory will be used and the data directory's absolute path will be used as the prefix of the log file name.");
+
+    @Documentation.ExcludeFromDocumentation("Internal use only")
+    public static final ConfigOption<InfoLogLevel> LOG_LEVEL =
+            key("rocksdb.log.level")
+                    .enumType(InfoLogLevel.class)
+                    .defaultValue(INFO_LEVEL)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The specified information logging level for RocksDB. "
+                                                    + "If unset, Flink will use %s.",
+                                            code(INFO_LEVEL.name()))
+                                    .linebreak()
+                                    .text(
+                                            "Note: RocksDB info logs will not be written to the TaskManager logs and there "
+                                                    + "is no rolling strategy, unless you configure %s, %s, and %s accordingly. "
+                                                    + "Without a rolling strategy, long-running tasks may lead to uncontrolled "
+                                                    + "disk space usage if configured with increased log levels!",
+                                            code(LOG_DIR.key()),
+                                            code(LOG_MAX_FILE_SIZE.key()),
+                                            code(LOG_FILE_NUM.key()))
+                                    .linebreak()
+                                    .text(
+                                            "There is no need to modify the RocksDB log level, unless for troubleshooting RocksDB.")
+                                    .build());
+
+    // --------------------------------------------------------------------------
+    // Provided configurable ColumnFamilyOptions within Flink
+    // --------------------------------------------------------------------------
+
+    public static final ConfigOption<CompressionType> COMPRESSION_TYPE =
+            key("rocksdb.compression.type")
+                    .enumType(CompressionType.class)
+                    .defaultValue(LZ4_COMPRESSION)
+                    .withDescription("The compression type.");
+
+    public static final ConfigOption<CompactionStyle> COMPACTION_STYLE =
+            key("rocksdb.compaction.style")
+                    .enumType(CompactionStyle.class)
+                    .defaultValue(LEVEL)
+                    .withDescription(
+                            String.format(
+                                    "The specified compaction style for DB. Candidate compaction style is %s, %s, %s or %s, "
+                                            + "and Flink chooses '%s' as default style.",
+                                    LEVEL.name(),
+                                    FIFO.name(),
+                                    UNIVERSAL.name(),
+                                    NONE.name(),
+                                    LEVEL.name()));
+
+    public static final ConfigOption<Boolean> USE_DYNAMIC_LEVEL_SIZE =
+            key("rocksdb.compaction.level.use-dynamic-size")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "If true, RocksDB will pick target size of each level dynamically. From an empty DB, ")
+                                    .text(
+                                            "RocksDB would make last level the base level, which means merging L0 data into the last level, ")
+                                    .text(
+                                            "until it exceeds max_bytes_for_level_base. And then repeat this process for second last level and so on. ")
+                                    .text("The default value is 'false'. ")
+                                    .text(
+                                            "For more information, please refer to %s",
+                                            link(
+                                                    "https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true",
+                                                    "RocksDB's doc."))
+                                    .build());
+
+    public static final ConfigOption<MemorySize> TARGET_FILE_SIZE_BASE =
+            key("rocksdb.compaction.level.target-file-size-base")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("64mb"))
+                    .withDescription(
+                            "The target file size for compaction, which determines a level-1 file size. "
+                                    + "The default value is '64MB'.");
+
+    public static final ConfigOption<MemorySize> MAX_SIZE_LEVEL_BASE =
+            key("rocksdb.compaction.level.max-size-level-base")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("256mb"))
+                    .withDescription(
+                            "The upper-bound of the total size of level base files in bytes. "
+                                    + "The default value is '256MB'.");
+
+    public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
+            key("rocksdb.writebuffer.size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("64mb"))
+                    .withDescription(
+                            "The amount of data built up in memory (backed by an unsorted log on disk) "
+                                    + "before converting to a sorted on-disk files. The default writebuffer size is '64MB'.");
+
+    public static final ConfigOption<Integer> MAX_WRITE_BUFFER_NUMBER =
+            key("rocksdb.writebuffer.count")
+                    .intType()
+                    .defaultValue(2)
+                    .withDescription(
+                            "The maximum number of write buffers that are built up in memory. "
+                                    + "The default value is '2'.");
+
+    public static final ConfigOption<Integer> MIN_WRITE_BUFFER_NUMBER_TO_MERGE =
+            key("rocksdb.writebuffer.number-to-merge")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "The minimum number of write buffers that will be merged together before writing to storage. "
+                                    + "The default value is '1'.");
+
+    public static final ConfigOption<MemorySize> BLOCK_SIZE =
+            key("rocksdb.block.blocksize")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("4kb"))
+                    .withDescription(
+                            "The approximate size (in bytes) of user data packed per block. "
+                                    + "The default blocksize is '4KB'.");
+
+    public static final ConfigOption<MemorySize> METADATA_BLOCK_SIZE =
+            key("rocksdb.block.metadata-blocksize")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("4kb"))
+                    .withDescription(
+                            "Approximate size of partitioned metadata packed per block. "
+                                    + "Currently applied to indexes block when partitioned index/filters option is enabled. "
+                                    + "The default blocksize is '4KB'.");
+
+    public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZE =
+            key("rocksdb.block.cache-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("8mb"))
+                    .withDescription(
+                            "The amount of the cache for data blocks in RocksDB. "
+                                    + "The default block-cache size is '8MB'.");
+
+    public static final ConfigOption<Boolean> USE_BLOOM_FILTER =
+            key("rocksdb.use-bloom-filter")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, every newly created SST file will contain a Bloom filter. "
+                                    + "It is disabled by default.");
+
+    public static final ConfigOption<Double> BLOOM_FILTER_BITS_PER_KEY =
+            key("rocksdb.bloom-filter.bits-per-key")
+                    .doubleType()
+                    .defaultValue(10.0)
+                    .withDescription(
+                            "Bits per key that bloom filter will use, this only take effect when bloom filter is used. "
+                                    + "The default value is 10.0.");
+
+    public static final ConfigOption<Boolean> BLOOM_FILTER_BLOCK_BASED_MODE =
+            key("rocksdb.bloom-filter.block-based-mode")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, RocksDB will use block-based filter instead of full filter, this only take effect when bloom filter is used. "
+                                    + "The default value is 'false'.");
+
+    public static DBOptions createDBOptions(DBOptions currentOptions, Configuration options) {
+        currentOptions.setMaxBackgroundJobs(options.get(MAX_BACKGROUND_THREADS));
+        currentOptions.setMaxOpenFiles(options.get(MAX_OPEN_FILES));
+        currentOptions.setInfoLogLevel(options.get(LOG_LEVEL));
+
+        String logDir = options.get(LOG_DIR);
+        if (logDir == null || logDir.isEmpty()) {
+            relocateDefaultDbLogDir(currentOptions);
+        } else {
+            currentOptions.setDbLogDir(logDir);
+        }
+
+        currentOptions.setMaxLogFileSize(options.get(LOG_MAX_FILE_SIZE).getBytes());
+        currentOptions.setKeepLogFileNum(options.get(LOG_FILE_NUM));
+        return currentOptions;
+    }
+
+    /**
+     * Relocates the default log directory of RocksDB with the Flink log directory. Finds the Flink
+     * log directory using log.file Java property that is set during startup.
+     *
+     * @param dbOptions The RocksDB {@link DBOptions}.
+     */
+    private static void relocateDefaultDbLogDir(DBOptions dbOptions) {
+        String logFilePath = System.getProperty("log.file");
+        if (logFilePath != null) {
+            File logFile = resolveFileLocation(logFilePath);
+            if (logFile != null && resolveFileLocation(logFile.getParent()) != null) {
+                dbOptions.setDbLogDir(logFile.getParent());
+            }
+        }
+    }
+
+    /**
+     * Verify log file location.
+     *
+     * @param logFilePath Path to log file
+     * @return File or null if not a valid log file
+     */
+    private static File resolveFileLocation(String logFilePath) {
+        File logFile = new File(logFilePath);
+        return (logFile.exists() && logFile.canRead()) ? logFile : null;
+    }
+
+    public static ColumnFamilyOptions createColumnOptions(
+            ColumnFamilyOptions currentOptions, Configuration options) {
+        currentOptions.setCompressionType(options.get(COMPRESSION_TYPE));
+        currentOptions.setCompactionStyle(options.get(COMPACTION_STYLE));
+        currentOptions.setLevelCompactionDynamicLevelBytes(options.get(USE_DYNAMIC_LEVEL_SIZE));
+        currentOptions.setTargetFileSizeBase(options.get(TARGET_FILE_SIZE_BASE).getBytes());
+        currentOptions.setMaxBytesForLevelBase(options.get(MAX_SIZE_LEVEL_BASE).getBytes());
+        currentOptions.setWriteBufferSize(options.get(WRITE_BUFFER_SIZE).getBytes());
+        currentOptions.setMaxWriteBufferNumber(options.get(MAX_WRITE_BUFFER_NUMBER));
+        currentOptions.setMinWriteBufferNumberToMerge(
+                options.get(MIN_WRITE_BUFFER_NUMBER_TO_MERGE));
+
+        TableFormatConfig tableFormatConfig = currentOptions.tableFormatConfig();
+
+        BlockBasedTableConfig blockBasedTableConfig;
+        if (tableFormatConfig == null) {
+            blockBasedTableConfig = new BlockBasedTableConfig();
+        } else {
+            if (tableFormatConfig instanceof PlainTableConfig) {
+                // if the table format config is PlainTableConfig, we just return current
+                // column-family options
+                return currentOptions;
+            } else {
+                blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig;
+            }
+        }
+
+        blockBasedTableConfig.setBlockSize(options.get(BLOCK_SIZE).getBytes());
+        blockBasedTableConfig.setMetadataBlockSize(options.get(METADATA_BLOCK_SIZE).getBytes());
+        blockBasedTableConfig.setBlockCacheSize(options.get(BLOCK_CACHE_SIZE).getBytes());
+
+        if (options.get(USE_BLOOM_FILTER)) {
+            double bitsPerKey = options.get(BLOOM_FILTER_BITS_PER_KEY);
+            boolean blockBasedMode = options.get(BLOOM_FILTER_BLOCK_BASED_MODE);
+            BloomFilter bloomFilter = new BloomFilter(bitsPerKey, blockBasedMode);
+            blockBasedTableConfig.setFilterPolicy(bloomFilter);
+        }
+
+        return currentOptions.setTableFormatConfig(blockBasedTableConfig);
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
new file mode 100644
index 00000000..92e663e9
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateFilter;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.TableStreamingReader;
+import org.apache.flink.table.store.utils.TypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.store.connector.RocksDBOptions.LOOKUP_CACHE_ROWS;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** A lookup {@link TableFunction} for file store. */
+public class FileStoreLookupFunction extends TableFunction<RowData> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileStoreLookupFunction.class);
+
+    private final FileStoreTable table;
+    private final List<String> projectFields;
+    private final List<String> joinKeys;
+    @Nullable private final Predicate predicate;
+
+    private transient Duration refreshInterval;
+    private transient File path;
+    private transient RocksDBStateFactory stateFactory;
+    private transient LookupTable lookupTable;
+
+    // timestamp when cache expires
+    private transient long nextLoadTime;
+    private transient TableStreamingReader streamingReader;
+
+    public FileStoreLookupFunction(
+            FileStoreTable table,
+            int[] projection,
+            int[] joinKeyIndex,
+            @Nullable Predicate predicate) {
+        TableSchema schema = table.schema();
+        checkArgument(
+                schema.partitionKeys().isEmpty(), "Currently only support non-partitioned table.");
+        checkArgument(schema.primaryKeys().size() > 0, "Currently only support primary key table.");
+        this.table = table;
+
+        // join keys are based on projection fields
+        this.joinKeys =
+                Arrays.stream(joinKeyIndex)
+                        .mapToObj(i -> schema.fieldNames().get(projection[i]))
+                        .collect(Collectors.toList());
+
+        this.projectFields =
+                Arrays.stream(projection)
+                        .mapToObj(i -> schema.fieldNames().get(i))
+                        .collect(Collectors.toList());
+
+        // add primary keys
+        for (String field : schema.primaryKeys()) {
+            if (!projectFields.contains(field)) {
+                projectFields.add(field);
+            }
+        }
+
+        this.predicate = predicate;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        String tmpDirectory = getTmpDirectory(context);
+        this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
+
+        Configuration options = Configuration.fromMap(table.schema().options());
+        this.refreshInterval = options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
+        this.stateFactory = new RocksDBStateFactory(path.toString(), options);
+
+        List<String> fieldNames = table.schema().logicalRowType().getFieldNames();
+        int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
+        RowType rowType = TypeUtils.project(table.schema().logicalRowType(), projection);
+
+        PredicateFilter recordFilter = createRecordFilter(projection);
+        this.lookupTable =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        table.schema().primaryKeys(),
+                        joinKeys,
+                        recordFilter,
+                        options.getLong(LOOKUP_CACHE_ROWS));
+        this.nextLoadTime = -1;
+        this.streamingReader = new TableStreamingReader(table, projection, this.predicate);
+
+        // do first load
+        refresh();
+    }
+
+    private PredicateFilter createRecordFilter(int[] projection) {
+        Predicate adjustedPredicate = null;
+        if (predicate != null) {
+            // adjust to projection index
+            adjustedPredicate =
+                    transformFieldMapping(
+                                    this.predicate,
+                                    IntStream.range(0, table.schema().fields().size())
+                                            .map(i -> Ints.indexOf(projection, i))
+                                            .toArray())
+                            .orElse(null);
+        }
+        return new PredicateFilter(
+                TypeUtils.project(table.schema().logicalRowType(), projection), adjustedPredicate);
+    }
+
+    /** Used by code generation. */
+    @SuppressWarnings("unused")
+    public void eval(Object... values) throws IOException {
+        checkRefresh();
+        List<RowData> results = lookupTable.get(GenericRowData.of(values));
+        for (RowData matchedRow : results) {
+            collect(matchedRow);
+        }
+    }
+
+    private void checkRefresh() throws IOException {
+        if (nextLoadTime > System.currentTimeMillis()) {
+            return;
+        }
+        if (nextLoadTime > 0) {
+            LOG.info(
+                    "Lookup table has refreshed after {} minute(s), refreshing",
+                    refreshInterval.toMinutes());
+        }
+
+        refresh();
+
+        nextLoadTime = System.currentTimeMillis() + refreshInterval.toMillis();
+    }
+
+    private void refresh() throws IOException {
+        while (true) {
+            Iterator<RowData> batch = streamingReader.nextBatch();
+            if (batch == null) {
+                return;
+            }
+            this.lookupTable.refresh(batch);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (stateFactory != null) {
+            stateFactory.close();
+            stateFactory = null;
+        }
+
+        if (path != null) {
+            FileUtils.deleteDirectoryQuietly(path);
+        }
+    }
+
+    private static String getTmpDirectory(FunctionContext context) {
+        try {
+            Field field = context.getClass().getDeclaredField("context");
+            field.setAccessible(true);
+            StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext) field.get(context);
+            String[] tmpDirectories =
+                    runtimeContext.getTaskManagerRuntimeInfo().getTmpDirectories();
+            return tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)];
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/LookupTable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/LookupTable.java
new file mode 100644
index 00000000..ae829a43
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/LookupTable.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+
+/** A lookup table which provides get and refresh. */
+public interface LookupTable {
+
+    List<RowData> get(RowData key) throws IOException;
+
+    void refresh(Iterator<RowData> incremental) throws IOException;
+
+    static LookupTable create(
+            RocksDBStateFactory stateFactory,
+            RowType rowType,
+            List<String> primaryKey,
+            List<String> joinKey,
+            Predicate<RowData> recordFilter,
+            long lruCacheSize)
+            throws IOException {
+        if (new HashSet<>(primaryKey).equals(new HashSet<>(joinKey))) {
+            return new PrimaryKeyLookupTable(
+                    stateFactory, rowType, joinKey, recordFilter, lruCacheSize);
+        } else {
+            return new SecondaryIndexLookupTable(
+                    stateFactory, rowType, primaryKey, joinKey, recordFilter, lruCacheSize);
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/PrimaryKeyLookupTable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/PrimaryKeyLookupTable.java
new file mode 100644
index 00000000..e6796162
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/PrimaryKeyLookupTable.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.store.utils.KeyProjectedRowData;
+import org.apache.flink.table.store.utils.TypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+
+/** A {@link LookupTable} for primary key table. */
+public class PrimaryKeyLookupTable implements LookupTable {
+
+    protected final RocksDBValueState tableState;
+
+    protected final Predicate<RowData> recordFilter;
+
+    protected int[] primaryKeyMapping;
+
+    protected final KeyProjectedRowData primaryKey;
+
+    public PrimaryKeyLookupTable(
+            RocksDBStateFactory stateFactory,
+            RowType rowType,
+            List<String> primaryKey,
+            Predicate<RowData> recordFilter,
+            long lruCacheSize)
+            throws IOException {
+        List<String> fieldNames = rowType.getFieldNames();
+        this.primaryKeyMapping = primaryKey.stream().mapToInt(fieldNames::indexOf).toArray();
+        this.primaryKey = new KeyProjectedRowData(primaryKeyMapping);
+        this.tableState =
+                stateFactory.valueState(
+                        "table",
+                        InternalSerializers.create(TypeUtils.project(rowType, primaryKeyMapping)),
+                        InternalSerializers.create(rowType),
+                        lruCacheSize);
+        this.recordFilter = recordFilter;
+    }
+
+    @Override
+    public List<RowData> get(RowData key) throws IOException {
+        RowData value = tableState.get(key);
+        return value == null ? Collections.emptyList() : Collections.singletonList(value);
+    }
+
+    @Override
+    public void refresh(Iterator<RowData> incremental) throws IOException {
+        while (incremental.hasNext()) {
+            RowData row = incremental.next();
+            primaryKey.replaceRow(row);
+            if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
+                if (recordFilter.test(row)) {
+                    tableState.put(primaryKey, row);
+                } else {
+                    // The new record under primary key is filtered
+                    // We need to delete this primary key as it no longer exists.
+                    tableState.delete(primaryKey);
+                }
+            } else {
+                tableState.delete(primaryKey);
+            }
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBSetState.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBSetState.java
new file mode 100644
index 00000000..99dc6341
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBSetState.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Rocksdb state for key -> Set values. */
+public class RocksDBSetState extends RocksDBState<List<byte[]>> {
+
+    private static final byte[] EMPTY = new byte[0];
+
+    public RocksDBSetState(
+            RocksDB db,
+            ColumnFamilyHandle columnFamily,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize) {
+        super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
+    }
+
+    public List<RowData> get(RowData key) throws IOException {
+        ByteArray keyBytes = wrap(serializeKey(key));
+        List<byte[]> valueBytes = cache.getIfPresent(keyBytes);
+        if (valueBytes == null) {
+            valueBytes = new ArrayList<>();
+            try (RocksIterator iterator = db.newIterator(columnFamily)) {
+                iterator.seek(keyBytes.bytes);
+
+                while (iterator.isValid() && startWithKeyPrefix(keyBytes.bytes, iterator.key())) {
+                    byte[] rawKeyBytes = iterator.key();
+                    byte[] value =
+                            Arrays.copyOfRange(
+                                    rawKeyBytes, keyBytes.bytes.length, rawKeyBytes.length);
+                    valueBytes.add(value);
+                    iterator.next();
+                }
+            }
+            cache.put(keyBytes, valueBytes);
+        }
+
+        List<RowData> values = new ArrayList<>(valueBytes.size());
+        for (byte[] value : valueBytes) {
+            valueInputView.setBuffer(value);
+            values.add(valueSerializer.deserialize(valueInputView));
+        }
+        return values;
+    }
+
+    public void retract(RowData key, RowData value) throws IOException {
+        try {
+            byte[] bytes = invalidKeyAndGetKVBytes(key, value);
+            if (db.get(columnFamily, bytes) != null) {
+                db.delete(columnFamily, writeOptions, bytes);
+            }
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public void add(RowData key, RowData value) throws IOException {
+        try {
+            byte[] bytes = invalidKeyAndGetKVBytes(key, value);
+            db.put(columnFamily, writeOptions, bytes, EMPTY);
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private byte[] invalidKeyAndGetKVBytes(RowData key, RowData value) throws IOException {
+        checkArgument(value != null);
+
+        keyOutView.clear();
+        keySerializer.serialize(key, keyOutView);
+
+        // it is hard to maintain cache, invalidate the key.
+        cache.invalidate(wrap(keyOutView.getCopyOfBuffer()));
+
+        valueSerializer.serialize(value, keyOutView);
+        return keyOutView.getCopyOfBuffer();
+    }
+
+    private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) {
+        if (rawKeyBytes.length < keyPrefixBytes.length) {
+            return false;
+        }
+
+        for (int i = keyPrefixBytes.length; --i >= 0; ) {
+            if (rawKeyBytes[i] != keyPrefixBytes[i]) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBState.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBState.java
new file mode 100644
index 00000000..67ddb9fd
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBState.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/** Rocksdb state for key value. */
+public abstract class RocksDBState<CacheV> {
+
+    protected final RocksDB db;
+
+    protected final WriteOptions writeOptions;
+
+    protected final ColumnFamilyHandle columnFamily;
+
+    protected final TypeSerializer<RowData> keySerializer;
+
+    protected final TypeSerializer<RowData> valueSerializer;
+
+    protected final DataOutputSerializer keyOutView;
+
+    protected final DataInputDeserializer valueInputView;
+
+    protected final DataOutputSerializer valueOutputView;
+
+    protected final Cache<ByteArray, CacheV> cache;
+
+    public RocksDBState(
+            RocksDB db,
+            ColumnFamilyHandle columnFamily,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize) {
+        this.db = db;
+        this.columnFamily = columnFamily;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.keyOutView = new DataOutputSerializer(32);
+        this.valueInputView = new DataInputDeserializer();
+        this.valueOutputView = new DataOutputSerializer(32);
+        this.writeOptions = new WriteOptions().setDisableWAL(true);
+        this.cache = CacheBuilder.newBuilder().maximumSize(lruCacheSize).build();
+    }
+
+    protected byte[] serializeKey(RowData key) throws IOException {
+        keyOutView.clear();
+        keySerializer.serialize(key, keyOutView);
+        return keyOutView.getCopyOfBuffer();
+    }
+
+    protected ByteArray wrap(byte[] bytes) {
+        return new ByteArray(bytes);
+    }
+
+    protected Reference ref(byte[] bytes) {
+        return new Reference(bytes);
+    }
+
+    /** A class wraps byte[] to implement equals and hashCode. */
+    protected static class ByteArray {
+
+        protected final byte[] bytes;
+
+        protected ByteArray(byte[] bytes) {
+            this.bytes = bytes;
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(bytes);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ByteArray byteArray = (ByteArray) o;
+            return Arrays.equals(bytes, byteArray.bytes);
+        }
+    }
+
+    /** A class wraps byte[] to indicate contain or not contain. */
+    protected static class Reference {
+
+        @Nullable protected final byte[] bytes;
+
+        protected Reference(@Nullable byte[] bytes) {
+            this.bytes = bytes;
+        }
+
+        public boolean isPresent() {
+            return bytes != null;
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBStateFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBStateFactory.java
new file mode 100644
index 00000000..1ece9ae0
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBStateFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.connector.RocksDBOptions;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/** Factory to create state. */
+public class RocksDBStateFactory implements Closeable {
+
+    private RocksDB db;
+
+    private final ColumnFamilyOptions columnFamilyOptions;
+
+    public RocksDBStateFactory(String path, Configuration conf) throws IOException {
+        DBOptions dbOptions =
+                RocksDBOptions.createDBOptions(
+                        new DBOptions()
+                                .setUseFsync(false)
+                                .setStatsDumpPeriodSec(0)
+                                .setCreateIfMissing(true),
+                        conf);
+        this.columnFamilyOptions =
+                RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(), conf);
+
+        try {
+            this.db = RocksDB.open(new Options(dbOptions, columnFamilyOptions), path);
+        } catch (RocksDBException e) {
+            throw new IOException("Error while opening RocksDB instance.", e);
+        }
+    }
+
+    public RocksDBValueState valueState(
+            String name,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize)
+            throws IOException {
+        return new RocksDBValueState(
+                db, createColumnFamily(name), keySerializer, valueSerializer, lruCacheSize);
+    }
+
+    public RocksDBSetState setState(
+            String name,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize)
+            throws IOException {
+        return new RocksDBSetState(
+                db, createColumnFamily(name), keySerializer, valueSerializer, lruCacheSize);
+    }
+
+    private ColumnFamilyHandle createColumnFamily(String name) throws IOException {
+        try {
+            return db.createColumnFamily(
+                    new ColumnFamilyDescriptor(
+                            name.getBytes(StandardCharsets.UTF_8), columnFamilyOptions));
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (db != null) {
+            db.close();
+            db = null;
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBValueState.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBValueState.java
new file mode 100644
index 00000000..7c601c51
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/RocksDBValueState.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Rocksdb state for key -> a single value. */
+public class RocksDBValueState extends RocksDBState<RocksDBState.Reference> {
+
+    public RocksDBValueState(
+            RocksDB db,
+            ColumnFamilyHandle columnFamily,
+            TypeSerializer<RowData> keySerializer,
+            TypeSerializer<RowData> valueSerializer,
+            long lruCacheSize) {
+        super(db, columnFamily, keySerializer, valueSerializer, lruCacheSize);
+    }
+
+    @Nullable
+    public RowData get(RowData key) throws IOException {
+        try {
+            Reference valueRef = get(wrap(serializeKey(key)));
+            return valueRef.isPresent() ? deserializeValue(valueRef.bytes) : null;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    private Reference get(ByteArray keyBytes) throws RocksDBException {
+        Reference valueRef = cache.getIfPresent(keyBytes);
+        if (valueRef == null) {
+            valueRef = ref(db.get(columnFamily, keyBytes.bytes));
+            cache.put(keyBytes, valueRef);
+        }
+
+        return valueRef;
+    }
+
+    public void put(RowData key, RowData value) throws IOException {
+        checkArgument(value != null);
+
+        try {
+            byte[] keyBytes = serializeKey(key);
+            byte[] valueBytes = serializeValue(value);
+            db.put(columnFamily, writeOptions, keyBytes, valueBytes);
+            cache.put(wrap(keyBytes), ref(valueBytes));
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public void delete(RowData key) throws IOException {
+        try {
+            byte[] keyBytes = serializeKey(key);
+            ByteArray keyByteArray = wrap(keyBytes);
+            if (get(keyByteArray).isPresent()) {
+                db.delete(columnFamily, writeOptions, keyBytes);
+                cache.put(keyByteArray, ref(null));
+            }
+        } catch (RocksDBException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private RowData deserializeValue(byte[] valueBytes) throws IOException {
+        valueInputView.setBuffer(valueBytes);
+        return valueSerializer.deserialize(valueInputView);
+    }
+
+    private byte[] serializeValue(RowData value) throws IOException {
+        valueOutputView.clear();
+        valueSerializer.serialize(value, valueOutputView);
+        return valueOutputView.getCopyOfBuffer();
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/SecondaryIndexLookupTable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/SecondaryIndexLookupTable.java
new file mode 100644
index 00000000..f2b03712
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/lookup/SecondaryIndexLookupTable.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.store.utils.KeyProjectedRowData;
+import org.apache.flink.table.store.utils.TypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+
+/** A {@link LookupTable} for primary key table which provides lookup by secondary key. */
+public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {
+
+    private final RocksDBSetState indexState;
+
+    private final KeyProjectedRowData secKeyRow;
+
+    public SecondaryIndexLookupTable(
+            RocksDBStateFactory stateFactory,
+            RowType rowType,
+            List<String> primaryKey,
+            List<String> secKey,
+            Predicate<RowData> recordFilter,
+            long lruCacheSize)
+            throws IOException {
+        super(stateFactory, rowType, primaryKey, recordFilter, lruCacheSize / 2);
+        List<String> fieldNames = rowType.getFieldNames();
+        int[] secKeyMapping = secKey.stream().mapToInt(fieldNames::indexOf).toArray();
+        this.secKeyRow = new KeyProjectedRowData(secKeyMapping);
+        this.indexState =
+                stateFactory.setState(
+                        "sec-index",
+                        InternalSerializers.create(TypeUtils.project(rowType, secKeyMapping)),
+                        InternalSerializers.create(TypeUtils.project(rowType, primaryKeyMapping)),
+                        lruCacheSize / 2);
+    }
+
+    @Override
+    public List<RowData> get(RowData key) throws IOException {
+        List<RowData> pks = indexState.get(key);
+        List<RowData> values = new ArrayList<>(pks.size());
+        for (RowData pk : pks) {
+            RowData value = tableState.get(pk);
+            if (value != null) {
+                values.add(value);
+            }
+        }
+        return values;
+    }
+
+    @Override
+    public void refresh(Iterator<RowData> incremental) throws IOException {
+        while (incremental.hasNext()) {
+            RowData row = incremental.next();
+            primaryKey.replaceRow(row);
+            if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
+                RowData previous = tableState.get(primaryKey);
+                if (previous != null) {
+                    indexState.retract(secKeyRow.replaceRow(previous), primaryKey);
+                }
+
+                if (recordFilter.test(row)) {
+                    tableState.put(primaryKey, row);
+                    indexState.add(secKeyRow.replaceRow(row), primaryKey);
+                } else {
+                    tableState.delete(primaryKey);
+                }
+            } else {
+                tableState.delete(primaryKey);
+                indexState.retract(secKeyRow.replaceRow(row), primaryKey);
+            }
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
index 8a8762c7..f6fb9ac4 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumerator.java
@@ -21,8 +21,9 @@ package org.apache.flink.table.store.connector.source;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.table.source.SnapshotEnumerator;
+import org.apache.flink.table.store.table.source.SnapshotEnumerator.EnumeratorResult;
 import org.apache.flink.table.store.table.source.TableScan;
 
 import org.slf4j.Logger;
@@ -40,7 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.Callable;
 
 import static org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint.INVALID_SNAPSHOT;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -54,10 +54,6 @@ public class ContinuousFileSplitEnumerator
 
     private final SplitEnumeratorContext<FileStoreSourceSplit> context;
 
-    private final TableScan scan;
-
-    private final SnapshotManager snapshotManager;
-
     private final Map<Integer, Queue<FileStoreSourceSplit>> bucketSplits;
 
     private final long discoveryInterval;
@@ -72,22 +68,20 @@ public class ContinuousFileSplitEnumerator
 
     public ContinuousFileSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
+            Path location,
             TableScan scan,
-            SnapshotManager snapshotManager,
             Collection<FileStoreSourceSplit> remainSplits,
             long currentSnapshotId,
             long discoveryInterval) {
         checkArgument(discoveryInterval > 0L);
         this.context = checkNotNull(context);
-        this.scan = checkNotNull(scan);
-        this.snapshotManager = snapshotManager;
         this.bucketSplits = new HashMap<>();
         addSplits(remainSplits);
         this.currentSnapshotId = currentSnapshotId;
         this.discoveryInterval = discoveryInterval;
         this.readersAwaitingSplit = new HashSet<>();
         this.splitGenerator = new FileStoreSourceSplitGenerator();
-        this.snapshotEnumerator = new SnapshotEnumerator(currentSnapshotId);
+        this.snapshotEnumerator = new SnapshotEnumerator(location, scan, currentSnapshotId);
     }
 
     private void addSplits(Collection<FileStoreSourceSplit> splits) {
@@ -159,7 +153,7 @@ public class ContinuousFileSplitEnumerator
         }
 
         currentSnapshotId = result.snapshotId;
-        addSplits(result.splits);
+        addSplits(splitGenerator.createSplits(result.plan));
         assignSplits();
     }
 
@@ -182,62 +176,4 @@ public class ContinuousFileSplitEnumerator
                     }
                 });
     }
-
-    private class SnapshotEnumerator implements Callable<EnumeratorResult> {
-
-        private long nextSnapshotId;
-
-        private SnapshotEnumerator(long currentSnapshot) {
-            this.nextSnapshotId = currentSnapshot + 1;
-        }
-
-        @Nullable
-        @Override
-        public EnumeratorResult call() {
-            // TODO sync with processDiscoveredSplits to avoid too more splits in memory
-            while (true) {
-                if (!snapshotManager.snapshotExists(nextSnapshotId)) {
-                    // TODO check latest snapshot id, expired?
-                    LOG.debug(
-                            "Next snapshot id {} not exists, wait for it to be generated.",
-                            nextSnapshotId);
-                    return null;
-                }
-
-                Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
-                if (snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
-                    if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
-                        LOG.warn("Ignore overwrite snapshot id {}.", nextSnapshotId);
-                    }
-
-                    nextSnapshotId++;
-                    LOG.debug(
-                            "Next snapshot id {} is not append, but is {}, check next one.",
-                            nextSnapshotId,
-                            snapshot.commitKind());
-                    continue;
-                }
-
-                List<FileStoreSourceSplit> splits =
-                        splitGenerator.createSplits(scan.withSnapshot(nextSnapshotId).plan());
-                EnumeratorResult result = new EnumeratorResult(nextSnapshotId, splits);
-                LOG.debug("Find snapshot id {}, it has {} splits.", nextSnapshotId, splits.size());
-
-                nextSnapshotId++;
-                return result;
-            }
-        }
-    }
-
-    private static class EnumeratorResult {
-
-        private final long snapshotId;
-
-        private final List<FileStoreSourceSplit> splits;
-
-        private EnumeratorResult(long snapshotId, List<FileStoreSourceSplit> splits) {
-            this.snapshotId = snapshotId;
-            this.splits = splits;
-        }
-    }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
index b54cbde1..71742cb0 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/FileStoreSource.java
@@ -139,8 +139,8 @@ public class FileStoreSource
             long currentSnapshot = snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : snapshotId;
             return new ContinuousFileSplitEnumerator(
                     context,
+                    table.location(),
                     scan.withIncremental(true), // the subsequent planning is all incremental
-                    snapshotManager,
                     splits,
                     currentSnapshot,
                     discoveryInterval);
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index 3404308a..68a976b0 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -23,7 +23,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
 import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
@@ -36,6 +38,7 @@ import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
 import org.apache.flink.table.store.CoreOptions.LogConsistency;
 import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider;
+import org.apache.flink.table.store.connector.lookup.FileStoreLookupFunction;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.predicate.PredicateConverter;
@@ -45,12 +48,14 @@ import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
 import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable;
 import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.utils.Projection;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
@@ -65,6 +70,7 @@ import static org.apache.flink.table.store.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE
  */
 public class TableStoreSource
         implements ScanTableSource,
+                LookupTableSource,
                 SupportsFilterPushDown,
                 SupportsProjectionPushDown,
                 SupportsLimitPushDown,
@@ -229,4 +235,19 @@ public class TableStoreSource
     public void applyLimit(long limit) {
         this.limit = limit;
     }
+
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
+        if (limit != null) {
+            throw new RuntimeException(
+                    "Limit push down should not happen in Lookup source, but it is " + limit);
+        }
+        int[] projection =
+                projectFields == null
+                        ? IntStream.range(0, table.schema().fields().size()).toArray()
+                        : Projection.of(projectFields).toTopLevelIndexes();
+        int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
+        return TableFunctionProvider.of(
+                new FileStoreLookupFunction(table, projection, joinKey, predicate));
+    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
new file mode 100644
index 00000000..b5cf0013
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for lookup join. */
+public class LookupJoinITCase extends AbstractTestBase {
+
+    private TableEnvironment env;
+
+    @Before
+    public void before() throws Exception {
+        env = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+        env.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100));
+        env.getConfig()
+                .getConfiguration()
+                .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+        String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+        env.executeSql(
+                String.format(
+                        "CREATE CATALOG my_catalog WITH ('type'='table-store', 'warehouse'='%s')",
+                        path));
+        executeSql("USE CATALOG my_catalog");
+        executeSql("CREATE TABLE T (i INT, `proctime` AS PROCTIME())");
+        executeSql(
+                "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT, k2 INT) WITH"
+                        + " ('continuous.discovery-interval'='1 ms')");
+    }
+
+    @Test
+    public void testLookup() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222),
+                        Row.of(3, null, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 44, 444, 4444),
+                        Row.of(3, 33, 333, 3333),
+                        Row.of(4, null, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testLookupProjection() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 44, 444),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testLookupFilterPk() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.i > 2";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null), Row.of(2, null, null), Row.of(3, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null),
+                        Row.of(2, null, null),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testLookupFilterSelect() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k1 > 111";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null), Row.of(2, 22, 222), Row.of(3, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null),
+                        Row.of(2, 44, 444),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testLookupFilterUnSelect() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k2 > 1111";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null), Row.of(2, 22, 222), Row.of(3, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, null, null),
+                        Row.of(2, 44, 444),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testLookupFilterUnSelectAndUpdate() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i AND D.k2 < 4444";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, null, null),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookup() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT D.i, T.i, D.k1, D.k2 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222),
+                        Row.of(3, 22, 333, 3333),
+                        Row.of(null, 33, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(null, 22, null, null),
+                        Row.of(3, 33, 333, 3333),
+                        Row.of(2, 44, 444, 4444));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookupProjection() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, 111), Row.of(22, 222), Row.of(22, 333), Row.of(33, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, 111), Row.of(22, null), Row.of(33, 333), Row.of(44, 444));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookupFilterPk() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.i > 2";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(11, null), Row.of(22, 333), Row.of(33, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, null), Row.of(22, null), Row.of(33, 333), Row.of(44, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookupFilterSelect() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k1 > 111";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, null), Row.of(22, 222), Row.of(22, 333), Row.of(33, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, null), Row.of(22, null), Row.of(33, 333), Row.of(44, 444));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookupFilterUnSelect() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k2 > 1111";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, null), Row.of(22, 222), Row.of(22, 333), Row.of(33, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, null), Row.of(22, null), Row.of(33, 333), Row.of(44, 444));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testNonPkLookupFilterUnSelectAndUpdate() throws Exception {
+        executeSql(
+                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), (3, 22, 333, 3333)");
+
+        String query =
+                "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.j AND D.k2 < 4444";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        List<Row> result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, 111), Row.of(22, 222), Row.of(22, 333), Row.of(33, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(11, 111), Row.of(22, null), Row.of(33, 333), Row.of(44, null));
+
+        iterator.close();
+    }
+
+    @Test
+    public void testRepeatRefresh() throws Exception {
+        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = BlockingIterator.of(env.executeSql(query).collect());
+
+        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, null, null));
+
+        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444)");
+        executeSql("INSERT INTO DIM VALUES (3, 33, 333, 3333)");
+        Thread.sleep(2000); // wait refresh
+        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 44, 444),
+                        Row.of(3, 33, 333),
+                        Row.of(4, null, null));
+
+        iterator.close();
+    }
+
+    private void executeSql(String sql) throws ExecutionException, InterruptedException {
+        env.executeSql(sql).await();
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/lookup/LookupTableTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/lookup/LookupTableTest.java
new file mode 100644
index 00000000..b9569a88
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/lookup/LookupTableTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LookupTable}. */
+public class LookupTableTest {
+
+    @TempDir Path tempDir;
+
+    private RocksDBStateFactory stateFactory;
+
+    private RowType rowType;
+
+    @BeforeEach
+    public void before() throws IOException {
+        this.stateFactory = new RocksDBStateFactory(tempDir.toString(), new Configuration());
+        this.rowType = RowType.of(new IntType(), new IntType(), new IntType());
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        if (stateFactory != null) {
+            stateFactory.close();
+        }
+    }
+
+    @Test
+    public void testPkTable() throws IOException {
+        LookupTable table =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        singletonList("f0"),
+                        singletonList("f0"),
+                        r -> r.getInt(0) < 3,
+                        ThreadLocalRandom.current().nextInt(2) * 10);
+
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
+        List<RowData> result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 11, 111);
+
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
+        result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 22, 222);
+
+        table.refresh(singletonList(row(RowKind.DELETE, 1, 11, 111)).iterator());
+        assertThat(table.get(row(1))).hasSize(0);
+
+        table.refresh(singletonList(row(3, 33, 333)).iterator());
+        assertThat(table.get(row(3))).hasSize(0);
+    }
+
+    @Test
+    public void testPkTableFilter() throws IOException {
+        LookupTable table =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        singletonList("f0"),
+                        singletonList("f0"),
+                        r -> r.getInt(1) < 22,
+                        ThreadLocalRandom.current().nextInt(2) * 10);
+
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
+        List<RowData> result = table.get(row(1));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 11, 111);
+
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
+        result = table.get(row(1));
+        assertThat(result).hasSize(0);
+    }
+
+    @Test
+    public void testSecKeyTable() throws IOException {
+        LookupTable table =
+                LookupTable.create(
+                        stateFactory,
+                        rowType,
+                        singletonList("f0"),
+                        singletonList("f1"),
+                        r -> r.getInt(0) < 3,
+                        ThreadLocalRandom.current().nextInt(2) * 10);
+
+        table.refresh(singletonList(row(1, 11, 111)).iterator());
+        List<RowData> result = table.get(row(11));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 11, 111);
+
+        table.refresh(singletonList(row(1, 22, 222)).iterator());
+        assertThat(table.get(row(11))).hasSize(0);
+        result = table.get(row(22));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 22, 222);
+
+        table.refresh(singletonList(row(2, 22, 222)).iterator());
+        result = table.get(row(22));
+        assertThat(result).hasSize(2);
+        assertRow(result.get(0), 1, 22, 222);
+        assertRow(result.get(1), 2, 22, 222);
+
+        table.refresh(singletonList(row(RowKind.DELETE, 2, 22, 222)).iterator());
+        result = table.get(row(22));
+        assertThat(result).hasSize(1);
+        assertRow(result.get(0), 1, 22, 222);
+
+        table.refresh(singletonList(row(3, 33, 333)).iterator());
+        assertThat(table.get(row(33))).hasSize(0);
+    }
+
+    private static RowData row(Object... values) {
+        return row(RowKind.INSERT, values);
+    }
+
+    private static RowData row(RowKind kind, Object... values) {
+        GenericRowData row = new GenericRowData(kind, values.length);
+
+        for (int i = 0; i < values.length; ++i) {
+            row.setField(i, values[i]);
+        }
+
+        return row;
+    }
+
+    private static void assertRow(RowData resultRow, int... expected) {
+        int[] results = new int[expected.length];
+        for (int i = 0; i < results.length; i++) {
+            results[i] = resultRow.getInt(i);
+        }
+        assertThat(results).containsExactly(expected);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
new file mode 100644
index 00000000..83eae895
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/SnapshotEnumerator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.Callable;
+
+/** Enumerator to enumerate incremental snapshots. */
+public class SnapshotEnumerator implements Callable<SnapshotEnumerator.EnumeratorResult> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SnapshotEnumerator.class);
+
+    private final SnapshotManager snapshotManager;
+
+    private final TableScan scan;
+
+    private long nextSnapshotId;
+
+    public SnapshotEnumerator(Path tablePath, TableScan scan, long currentSnapshot) {
+        this.snapshotManager = new SnapshotManager(tablePath);
+        this.scan = scan;
+        this.nextSnapshotId = currentSnapshot + 1;
+    }
+
+    @Nullable
+    @Override
+    public EnumeratorResult call() {
+        // TODO sync with processDiscoveredSplits to avoid too more splits in memory
+        while (true) {
+            if (!snapshotManager.snapshotExists(nextSnapshotId)) {
+                // TODO check latest snapshot id, expired?
+                LOG.debug(
+                        "Next snapshot id {} does not exist, wait for the snapshot generation.",
+                        nextSnapshotId);
+                return null;
+            }
+
+            Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
+            if (snapshot.commitKind() != Snapshot.CommitKind.APPEND) {
+                if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
+                    LOG.warn("Ignore overwrite snapshot id {}.", nextSnapshotId);
+                }
+
+                nextSnapshotId++;
+                LOG.debug(
+                        "Next snapshot id {} is not APPEND, but is {}, check next one.",
+                        nextSnapshotId,
+                        snapshot.commitKind());
+                continue;
+            }
+
+            TableScan.Plan plan = scan.withSnapshot(nextSnapshotId).plan();
+            EnumeratorResult result = new EnumeratorResult(nextSnapshotId, plan);
+            LOG.debug("Find snapshot id {}.", nextSnapshotId);
+
+            nextSnapshotId++;
+            return result;
+        }
+    }
+
+    /** Enumerator result. */
+    public static class EnumeratorResult {
+
+        public final long snapshotId;
+
+        public final TableScan.Plan plan;
+
+        private EnumeratorResult(long snapshotId, TableScan.Plan plan) {
+            this.snapshotId = snapshotId;
+            this.plan = plan;
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
new file mode 100644
index 00000000..58a420e2
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableStreamingReader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateFilter;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.utils.TypeUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
+import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.IntUnaryOperator;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
+
+/** A streaming reader to read table. */
+public class TableStreamingReader {
+
+    private final FileStoreTable table;
+    private final int[] projection;
+    @Nullable private final Predicate predicate;
+    @Nullable private final PredicateFilter recordFilter;
+
+    private SnapshotEnumerator enumerator;
+
+    public TableStreamingReader(
+            FileStoreTable table, int[] projection, @Nullable Predicate predicate) {
+        this.table = table;
+        this.projection = projection;
+        this.predicate = predicate;
+        if (predicate != null) {
+            List<String> fieldNames = table.schema().fieldNames();
+            List<String> primaryKeys = table.schema().primaryKeys();
+
+            // for pk table: only filter by pk, the stream is upsert instead of changelog
+            // for non-pk table: filter all
+            IntUnaryOperator operator =
+                    i -> {
+                        int index = Ints.indexOf(projection, i);
+                        boolean safeFilter =
+                                primaryKeys.isEmpty() || primaryKeys.contains(fieldNames.get(i));
+                        return safeFilter ? index : -1;
+                    };
+
+            int[] fieldIdxToProjectionIdx =
+                    IntStream.range(0, table.schema().fields().size()).map(operator).toArray();
+
+            this.recordFilter =
+                    new PredicateFilter(
+                            TypeUtils.project(table.schema().logicalRowType(), projection),
+                            transformFieldMapping(predicate, fieldIdxToProjectionIdx).orElse(null));
+        } else {
+            recordFilter = null;
+        }
+    }
+
+    @Nullable
+    public Iterator<RowData> nextBatch() throws IOException {
+        if (enumerator == null) {
+            TableScan scan = table.newScan();
+            if (predicate != null) {
+                scan.withFilter(predicate);
+            }
+            TableScan.Plan plan = scan.plan();
+            long snapshotId = Objects.requireNonNull(plan.snapshotId);
+            enumerator =
+                    new SnapshotEnumerator(
+                            table.location(), scan.withIncremental(true), snapshotId);
+            return read(plan);
+        } else {
+            SnapshotEnumerator.EnumeratorResult result = enumerator.call();
+            if (result == null) {
+                return null;
+            }
+            return read(result.plan);
+        }
+    }
+
+    private Iterator<RowData> read(TableScan.Plan plan) throws IOException {
+        TableRead read = table.newRead().withProjection(projection);
+        if (predicate != null) {
+            read.withFilter(predicate);
+        }
+
+        List<ConcatRecordReader.ReaderSupplier<RowData>> readers = new ArrayList<>();
+        for (Split split : plan.splits) {
+            readers.add(() -> read.createReader(split));
+        }
+        Iterator<RowData> iterator = new RecordReaderIterator<>(ConcatRecordReader.create(readers));
+        if (recordFilter != null) {
+            return Iterators.filter(iterator, recordFilter::test);
+        }
+        return iterator;
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
index 137232ad..90b1bd4f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/PredicateConverterTest.java
@@ -55,7 +55,11 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 public class PredicateConverterTest {
 
     private static final PredicateBuilder BUILDER =
-            new PredicateBuilder(RowType.of(new BigIntType(), new DoubleType()));
+            new PredicateBuilder(
+                    new RowType(
+                            Arrays.asList(
+                                    new RowType.RowField("long1", new BigIntType()),
+                                    new RowType.RowField("double1", new DoubleType()))));
 
     private static final PredicateConverter CONVERTER = new PredicateConverter(BUILDER);
 
@@ -73,7 +77,8 @@ public class PredicateConverterTest {
 
     public static Stream<Arguments> provideResolvedExpression() {
         FieldReferenceExpression longRefExpr =
-                new FieldReferenceExpression("long1", DataTypes.BIGINT(), 0, 0);
+                new FieldReferenceExpression(
+                        "long1", DataTypes.BIGINT(), Integer.MAX_VALUE, Integer.MAX_VALUE);
         ValueLiteralExpression intLitExpr = new ValueLiteralExpression(10);
         ValueLiteralExpression intLitExpr2 = new ValueLiteralExpression(20);
         long longLit = 10L;
@@ -81,7 +86,8 @@ public class PredicateConverterTest {
                 new ValueLiteralExpression(null, DataTypes.BIGINT());
 
         FieldReferenceExpression doubleRefExpr =
-                new FieldReferenceExpression("double1", DataTypes.DOUBLE(), 0, 1);
+                new FieldReferenceExpression(
+                        "double1", DataTypes.DOUBLE(), Integer.MAX_VALUE, Integer.MAX_VALUE);
         ValueLiteralExpression floatLitExpr = new ValueLiteralExpression(3.14f);
         double doubleLit = 3.14d;
 
@@ -736,7 +742,7 @@ public class PredicateConverterTest {
     }
 
     private static FieldReferenceExpression field(int i, DataType type) {
-        return new FieldReferenceExpression("name", type, 0, i);
+        return new FieldReferenceExpression("f" + i, type, Integer.MAX_VALUE, Integer.MAX_VALUE);
     }
 
     private static CallExpression call(FunctionDefinition function, ResolvedExpression... args) {