You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2023/01/17 01:45:24 UTC

[doris-flink-connector] branch master updated: [improve] improve lookup join to async and batch (#97)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 522bbf4  [improve] improve lookup join to async and batch (#97)
522bbf4 is described below

commit 522bbf41b29c88911373660a674e183a059714a3
Author: wudi <67...@qq.com>
AuthorDate: Tue Jan 17 09:45:19 2023 +0800

    [improve] improve lookup join to async and batch (#97)
    
    * imporve lookup join
---
 .../doris/flink/cfg/DorisConnectionOptions.java    |  19 +-
 .../apache/doris/flink/cfg/DorisLookupOptions.java |  68 ++++++-
 .../org/apache/doris/flink/cfg/DorisOptions.java   |  16 +-
 .../flink/connection/JdbcConnectionProvider.java   |  33 ++++
 .../connection/SimpleJdbcConnectionProvider.java   |  74 ++++++++
 .../converter/DorisRowConverter.java               |   4 +-
 .../doris/flink/lookup/DorisJdbcLookupReader.java  | 132 +++++++++++++
 .../doris/flink/lookup/DorisLookupReader.java      |  31 +++
 .../apache/doris/flink/lookup/ExecutionPool.java   | 209 +++++++++++++++++++++
 .../java/org/apache/doris/flink/lookup/Get.java    |  42 +++++
 .../org/apache/doris/flink/lookup/GetAction.java   |  44 +++++
 .../apache/doris/flink/lookup/LookupSchema.java    |  91 +++++++++
 .../java/org/apache/doris/flink/lookup/Record.java |  81 ++++++++
 .../org/apache/doris/flink/lookup/RecordKey.java   | 138 ++++++++++++++
 .../java/org/apache/doris/flink/lookup/Worker.java | 200 ++++++++++++++++++++
 .../doris/flink/table/DorisConfigOptions.java      |  29 ++-
 .../flink/table/DorisDynamicTableFactory.java      |  15 ++
 .../doris/flink/table/DorisDynamicTableSource.java |  31 ++-
 .../table/DorisRowDataAsyncLookupFunction.java     | 128 +++++++++++++
 .../table/DorisRowDataJdbcLookupFunction.java      | 120 ++++++++++++
 .../doris/flink/lookup/LookupJoinExample.java      |  86 +++++++++
 .../org/apache/doris/flink/lookup/RecordTest.java  |  90 +++++++++
 22 files changed, 1664 insertions(+), 17 deletions(-)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
index 9b2187c..00abd52 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
@@ -30,6 +30,7 @@ public class DorisConnectionOptions implements Serializable {
     protected final String fenodes;
     protected final String username;
     protected final String password;
+    protected String jdbcUrl;
 
     public DorisConnectionOptions(String fenodes, String username, String password) {
         this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes  is empty");
@@ -37,6 +38,11 @@ public class DorisConnectionOptions implements Serializable {
         this.password = password;
     }
 
+    public DorisConnectionOptions(String fenodes, String username, String password, String jdbcUrl){
+        this(fenodes,username,password);
+        this.jdbcUrl = jdbcUrl;
+    }
+
     public String getFenodes() {
         return fenodes;
     }
@@ -49,6 +55,10 @@ public class DorisConnectionOptions implements Serializable {
         return password;
     }
 
+    public String getJdbcUrl(){
+        return jdbcUrl;
+    }
+
     /**
      * Builder for {@link DorisConnectionOptions}.
      */
@@ -57,6 +67,8 @@ public class DorisConnectionOptions implements Serializable {
         private String username;
         private String password;
 
+        private String jdbcUrl;
+
         public DorisConnectionOptionsBuilder withFenodes(String fenodes) {
             this.fenodes = fenodes;
             return this;
@@ -72,8 +84,13 @@ public class DorisConnectionOptions implements Serializable {
             return this;
         }
 
+        public DorisConnectionOptionsBuilder withJdbcUrl(String jdbcUrl) {
+            this.jdbcUrl = jdbcUrl;
+            return this;
+        }
+
         public DorisConnectionOptions build() {
-            return new DorisConnectionOptions(fenodes, username, password);
+            return new DorisConnectionOptions(fenodes, username, password, jdbcUrl);
         }
     }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
index f0f7eb4..a1d8f30 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
@@ -24,12 +24,27 @@ public class DorisLookupOptions implements Serializable {
     private final long cacheMaxSize;
     private final long cacheExpireMs;
     private final int maxRetryTimes;
+    private final int jdbcReadBatchSize;
+    private final int jdbcReadBatchQueueSize;
+    private final int jdbcReadThreadSize;
+
+    private final boolean async;
 
     public DorisLookupOptions(
-            long cacheMaxSize, long cacheExpireMs, int maxRetryTimes) {
+            long cacheMaxSize,
+            long cacheExpireMs,
+            int maxRetryTimes,
+            int jdbcReadBatchSize,
+            int jdbcReadBatchQueueSize,
+            int jdbcReadThreadSize,
+            boolean async) {
         this.cacheMaxSize = cacheMaxSize;
         this.cacheExpireMs = cacheExpireMs;
         this.maxRetryTimes = maxRetryTimes;
+        this.jdbcReadBatchSize = jdbcReadBatchSize;
+        this.jdbcReadBatchQueueSize = jdbcReadBatchQueueSize;
+        this.jdbcReadThreadSize = jdbcReadThreadSize;
+        this.async = async;
     }
 
     public long getCacheMaxSize() {
@@ -44,6 +59,22 @@ public class DorisLookupOptions implements Serializable {
         return maxRetryTimes;
     }
 
+    public int getJdbcReadBatchSize() {
+        return jdbcReadBatchSize;
+    }
+
+    public int getJdbcReadBatchQueueSize() {
+        return jdbcReadBatchQueueSize;
+    }
+
+    public int getJdbcReadThreadSize() {
+        return jdbcReadThreadSize;
+    }
+
+    public boolean isAsync() {
+        return async;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -52,7 +83,11 @@ public class DorisLookupOptions implements Serializable {
     public static class Builder {
         private long cacheMaxSize = -1L;
         private long cacheExpireMs = -1L;
-        private int maxRetryTimes = 3;
+        private int maxRetryTimes = 1;
+        private int jdbcReadBatchSize;
+        private int jdbcReadBatchQueueSize;
+        private int jdbcReadThreadSize;
+        private boolean async;
 
         /** optional, lookup cache max size, over this value, the old data will be eliminated. */
         public Builder setCacheMaxSize(long cacheMaxSize) {
@@ -72,9 +107,36 @@ public class DorisLookupOptions implements Serializable {
             return this;
         }
 
+        public Builder setJdbcReadBatchSize(int jdbcReadBatchSize){
+            this.jdbcReadBatchSize = jdbcReadBatchSize;
+            return this;
+        }
+
+        public Builder setJdbcReadBatchQueueSize(int jdbcReadBatchQueueSize){
+            this.jdbcReadBatchQueueSize = jdbcReadBatchQueueSize;
+            return this;
+        }
+
+        public Builder setJdbcReadThreadSize(int jdbcReadThreadSize){
+            this.jdbcReadThreadSize = jdbcReadThreadSize;
+            return this;
+        }
+
+        public Builder setAsync(boolean async){
+            this.async = async;
+            return this;
+        }
+
+
         public DorisLookupOptions build() {
             return new DorisLookupOptions(
-                    cacheMaxSize, cacheExpireMs, maxRetryTimes);
+                    cacheMaxSize,
+                    cacheExpireMs,
+                    maxRetryTimes,
+                    jdbcReadBatchSize,
+                    jdbcReadBatchQueueSize,
+                    jdbcReadThreadSize,
+                    async);
         }
     }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index 7d6963b..c9e36e3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -37,6 +37,11 @@ public class DorisOptions extends DorisConnectionOptions {
         this.tableIdentifier = tableIdentifier;
     }
 
+    public DorisOptions(String fenodes, String username, String password, String tableIdentifier, String jdbcUrl) {
+        super(fenodes, username, password, jdbcUrl);
+        this.tableIdentifier = tableIdentifier;
+    }
+
     public String getTableIdentifier() {
         return tableIdentifier;
     }
@@ -55,6 +60,8 @@ public class DorisOptions extends DorisConnectionOptions {
      */
     public static class Builder {
         private String fenodes;
+
+        private String jdbcUrl;
         private String username;
         private String password;
         private String tableIdentifier;
@@ -91,11 +98,18 @@ public class DorisOptions extends DorisConnectionOptions {
             return this;
         }
 
+        /**
+         * not required, fe jdbc url, for lookup query
+         */
+        public Builder setJdbcUrl(String jdbcUrl) {
+            this.jdbcUrl = jdbcUrl;
+            return this;
+        }
 
         public DorisOptions build() {
             checkNotNull(fenodes, "No fenodes supplied.");
             checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
-            return new DorisOptions(fenodes, username, password, tableIdentifier);
+            return new DorisOptions(fenodes, username, password, tableIdentifier, jdbcUrl);
         }
     }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/JdbcConnectionProvider.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/JdbcConnectionProvider.java
new file mode 100644
index 0000000..3e840c0
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/JdbcConnectionProvider.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.connection;
+
+import java.sql.Connection;
+
+public interface JdbcConnectionProvider {
+
+    /**
+     * Get existing connection or establish an new one if there is none.
+     */
+    Connection getOrEstablishConnection() throws Exception;
+
+
+    /** Close possible existing connection. */
+    void closeConnection();
+
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
new file mode 100644
index 0000000..4d42cd6
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.connection;
+
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Objects;
+
+public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
+    private static final long serialVersionUID = 1L;
+
+    private final DorisConnectionOptions options;
+
+    private transient Connection connection;
+
+    public SimpleJdbcConnectionProvider(DorisConnectionOptions options){
+        this.options = options;
+    }
+
+    @Override
+    public Connection getOrEstablishConnection() throws ClassNotFoundException, SQLException {
+        if (connection != null && !connection.isClosed()) {
+            return connection;
+        }
+        try {
+            Class.forName("com.mysql.cj.jdbc.Driver");
+        } catch (ClassNotFoundException ex) {
+            LOG.warn("can not found class com.mysql.cj.jdbc.Driver, use class com.mysql.jdbc.Driver");
+            Class.forName("com.mysql.jdbc.Driver");
+        }
+        if (!Objects.isNull(options.getUsername())) {
+            connection = DriverManager.getConnection(options.getJdbcUrl(), options.getUsername(), options.getPassword());
+        } else {
+            connection = DriverManager.getConnection(options.getJdbcUrl());
+        }
+        return connection;
+    }
+
+    @Override
+    public void closeConnection() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                LOG.warn("JDBC connection close failed.", e);
+            } finally {
+                connection = null;
+            }
+        }
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index 79c3b81..435c0f3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -200,7 +200,7 @@ public class DorisRowConverter implements Serializable {
                 return ((index, val) -> null);
             case CHAR:
             case VARCHAR:
-                return (index, val) -> val.getString(index);
+                return (index, val) -> val.getString(index).toString();
             case BOOLEAN:
                 return (index, val) -> val.getBoolean(index);
             case BINARY:
@@ -209,7 +209,7 @@ public class DorisRowConverter implements Serializable {
             case DECIMAL:
                 final int decimalPrecision = ((DecimalType) type).getPrecision();
                 final int decimalScale = ((DecimalType) type).getScale();
-                return (index, val) -> val.getDecimal(index, decimalPrecision, decimalScale);
+                return (index, val) -> val.getDecimal(index, decimalPrecision, decimalScale).toBigDecimal();
             case TINYINT:
                 return (index, val) -> val.getByte(index);
             case SMALLINT:
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
new file mode 100644
index 0000000..9090783
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public class DorisJdbcLookupReader extends DorisLookupReader {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DorisJdbcLookupReader.class);
+
+    private ExecutionPool pool;
+
+    private DorisRowConverter converter;
+
+    private DorisRowConverter keyConverter;
+
+    private LookupSchema schema;
+
+    public DorisJdbcLookupReader(DorisOptions options, DorisLookupOptions lookupOptions, LookupSchema lookupSchema) {
+        this.converter = new DorisRowConverter(lookupSchema.getFieldTypes());
+        this.pool = new ExecutionPool(options, lookupOptions);
+        this.schema = lookupSchema;
+        this.keyConverter = buildKeyConvert();
+    }
+
+    private DorisRowConverter buildKeyConvert() {
+        int[] keyIndex = schema.getKeyIndex();
+        DataType[] keyTypes = new DataType[keyIndex.length];
+        DataType[] fieldTypes = schema.getFieldTypes();
+        for (int i = 0; i < keyIndex.length; i++) {
+            keyTypes[i] = fieldTypes[keyIndex[i]];
+        }
+        return new DorisRowConverter(keyTypes);
+    }
+
+    @Override
+    public CompletableFuture<List<RowData>> asyncGet(RowData recordIn) throws IOException {
+        CompletableFuture<List<RowData>> result = new CompletableFuture<>();
+        Record record = convertRecord(recordIn);
+        try {
+            pool.get(new Get(record))
+                    .handleAsync((resultRow, throwable) -> {
+                        try {
+                            if (throwable != null) {
+                                result.completeExceptionally(throwable);
+                            } else {
+                                if (resultRow == null) {
+                                    result.complete(new ArrayList<>());
+                                } else {
+                                    //convert Record to RowData
+                                    List<RowData> rowDatas = convertRowDataList(resultRow);
+                                    result.complete(rowDatas);
+                                }
+                            }
+                        } catch (Throwable e) {
+                            result.completeExceptionally(e);
+                        }
+                        return null;
+                    });
+        } catch (Exception e) {
+            result.completeExceptionally(e);
+        }
+        return result;
+    }
+
+    private List<RowData> convertRowDataList(List<Record> records) {
+        List<RowData> results = new ArrayList<>();
+        for (Record record : records) {
+            RowData rowData = convertRowData(record);
+            results.add(rowData);
+        }
+        return results;
+    }
+
+    private RowData convertRowData(Record record) {
+        if (record == null) {
+            return null;
+        }
+        Object[] values = record.getValues();
+        GenericRowData rowData = converter.convertInternal(Arrays.asList(values));
+        return rowData;
+    }
+
+    private Record convertRecord(RowData recordIn) {
+        Record record = new Record(schema);
+        int[] keyIndex = schema.getKeyIndex();
+
+        for (int index = 0; index < keyIndex.length; index++) {
+            Object value = keyConverter.convertExternal(recordIn, index);
+            record.setObject(keyIndex[index], value);
+        }
+        return record;
+    }
+
+    @Override
+    public List<RowData> get(RowData record) throws IOException {
+        try {
+            return this.asyncGet(record).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IOException(e);
+        }
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
new file mode 100644
index 0000000..b6c5073
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public abstract class DorisLookupReader {
+
+    public abstract CompletableFuture<List<RowData>> asyncGet(RowData record) throws IOException;
+
+    public abstract List<RowData> get(RowData record) throws IOException;
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
new file mode 100644
index 0000000..c9638f5
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ExecutionPool implements Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(ExecutionPool.class);
+    private ActionWatcher readActionWatcher;
+    final ArrayBlockingQueue<Get> queue;
+    private AtomicBoolean started; //determine whether the executionPool is running
+    private AtomicBoolean workerStated; //determine whether the worker is running
+    ExecutorService actionWatcherExecutorService;
+    ExecutorService workerExecutorService;
+    ThreadFactory workerThreadFactory;
+    ThreadFactory actionWatcherThreadFactory;
+    private Worker[] workers;
+
+    private Semaphore semaphore;
+
+    private final int jdbcReadThreadSize;
+
+    public ExecutionPool(DorisOptions options, DorisLookupOptions lookupOptions) {
+        started = new AtomicBoolean(false);
+        workerStated = new AtomicBoolean(false);
+        workerThreadFactory = new DefaultThreadFactory("worker");
+        actionWatcherThreadFactory = new DefaultThreadFactory("action-watcher");
+        this.queue = new ArrayBlockingQueue<>(lookupOptions.getJdbcReadBatchQueueSize());
+        this.readActionWatcher = new ActionWatcher(lookupOptions.getJdbcReadBatchSize());
+        this.workers = new Worker[lookupOptions.getJdbcReadThreadSize()];
+        for (int i = 0; i < workers.length; ++i) {
+            workers[i] = new Worker(workerStated, options, lookupOptions, i);
+        }
+        this.jdbcReadThreadSize = lookupOptions.getJdbcReadThreadSize();
+        start();
+    }
+
+    private void start() {
+        if (started.compareAndSet(false, true)) {
+            workerStated.set(true);
+            workerExecutorService = new ThreadPoolExecutor(workers.length, workers.length, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), workerThreadFactory, new ThreadPoolExecutor.AbortPolicy());
+            actionWatcherExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), actionWatcherThreadFactory, new ThreadPoolExecutor.AbortPolicy());
+            for (int i = 0; i < workers.length; ++i) {
+                workerExecutorService.execute(workers[i]);
+            }
+            actionWatcherExecutorService.execute(readActionWatcher);
+            this.semaphore = new Semaphore(jdbcReadThreadSize);
+        }
+    }
+
+    public CompletableFuture<List<Record>> get(Get get) {
+        appendGet(get);
+        return get.getFuture();
+    }
+
+    private void appendGet(Get get) {
+        get.setFuture(new CompletableFuture<>());
+        try {
+            if (!queue.offer(get, 10000L, TimeUnit.MILLISECONDS)) {
+                get.getFuture().completeExceptionally(new TimeoutException());
+            }
+        } catch (InterruptedException e) {
+            get.getFuture().completeExceptionally(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (started.compareAndSet(true, false)) {
+            actionWatcherExecutorService.shutdown();
+            workerStated.set(false);
+            workerExecutorService.shutdown();
+            this.semaphore = null;
+        }
+    }
+
+    public boolean submit(GetAction action) {
+        //if has semaphore, try to obtain the semaphore, otherwise return submit failure
+        if (semaphore != null) {
+            try {
+                boolean acquire = semaphore.tryAcquire(2000L, TimeUnit.MILLISECONDS);
+                if (!acquire) {
+                    return false;
+                }
+            } catch (InterruptedException e) {
+                throw new RuntimeException("get semaphore be interrupt");
+            }
+            action.setSemaphore(semaphore);
+        }
+
+        //try to submit to worker
+        for (int i = 0; i < workers.length; ++i) {
+            Worker worker = workers[i];
+            if (worker.offer(action)) {
+                return true;
+            }
+        }
+        //If submit fails, it will be released, and if successful, the worker will be responsible for the release
+        if (semaphore != null) {
+            semaphore.release();
+        }
+        return false;
+    }
+
+    /**
+     * monitor the query action
+     * and submit it to the worker as soon as the data arrives
+     */
+    class ActionWatcher implements Runnable {
+        private int batchSize;
+
+        public ActionWatcher(int batchSize) {
+            this.batchSize = batchSize;
+        }
+
+        @Override
+        public void run() {
+            LOG.info("action watcher start");
+            List<Get> recordList = new ArrayList<>(batchSize);
+            while (started.get()) {
+                try {
+                    recordList.clear();
+                    Get firstGet = queue.poll(2, TimeUnit.SECONDS);
+                    if (firstGet != null) {
+                        recordList.add(firstGet);
+                        queue.drainTo(recordList, batchSize - 1);
+                        Map<String, List<Get>> getsByTable = new HashMap<>();
+                        for (Get get : recordList) {
+                            List<Get> list = getsByTable.computeIfAbsent(get.getRecord().getTableIdentifier(), (s) -> new ArrayList<>());
+                            list.add(get);
+                        }
+                        for (Map.Entry<String, List<Get>> entry : getsByTable.entrySet()) {
+                            GetAction getAction = new GetAction(entry.getValue());
+                            while (!submit(getAction)) {
+                            }
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    break;
+                } catch (Exception e) {
+                    for (Get get : recordList) {
+                        if (!get.getFuture().isDone()) {
+                            get.getFuture().completeExceptionally(e);
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "ActionWatcher{" +
+                    "batchSize=" + batchSize +
+                    '}';
+        }
+    }
+
+    static class DefaultThreadFactory implements ThreadFactory {
+        private static final AtomicInteger poolNumber = new AtomicInteger(1);
+        private final AtomicInteger threadNumber = new AtomicInteger(1);
+        private final String namePrefix;
+
+        DefaultThreadFactory(String name) {
+            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + name + "-";
+        }
+
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
+            t.setDaemon(false);
+            return t;
+        }
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Get.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Get.java
new file mode 100644
index 0000000..ba3efcc
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Get.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class Get {
+    Record record;
+    CompletableFuture<List<Record>> future;
+
+    public Get(Record record) {
+        this.record = record;
+    }
+
+    public Record getRecord() {
+        return record;
+    }
+
+    public CompletableFuture<List<Record>> getFuture() {
+        return this.future;
+    }
+
+    public void setFuture(CompletableFuture<List<Record>> future) {
+        this.future = future;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/GetAction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/GetAction.java
new file mode 100644
index 0000000..db59ec7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/GetAction.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+public class GetAction {
+
+    List<Get> getList;
+
+    Semaphore semaphore;
+
+    public GetAction(List<Get> getList) {
+        this.getList = getList;
+    }
+
+    public List<Get> getGetList() {
+        return getList;
+    }
+
+    public Semaphore getSemaphore() {
+        return semaphore;
+    }
+
+    public void setSemaphore(Semaphore semaphore) {
+        this.semaphore = semaphore;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupSchema.java
new file mode 100644
index 0000000..698427f
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupSchema.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class LookupSchema implements Serializable {
+
+    private String tableIdentifier;
+    private String[] selectFields;
+    private String[] conditionFields;
+    private DataType[] fieldTypes;
+    private int[] keyIndex;
+
+    public LookupSchema(String tableIdentifier, String[] selectFields, DataType[] fieldTypes, String[] conditionFields, int[] keyIndex) {
+        this.tableIdentifier = tableIdentifier;
+        this.selectFields = selectFields;
+        this.fieldTypes = fieldTypes;
+        this.conditionFields = conditionFields;
+        this.keyIndex = keyIndex;
+    }
+
+    public String getTableIdentifier() {
+        return tableIdentifier;
+    }
+
+    public String[] getSelectFields() {
+        return selectFields;
+    }
+
+    public String[] getConditionFields() {
+        return conditionFields;
+    }
+
+    public DataType[] getFieldTypes() {
+        return fieldTypes;
+    }
+
+    public int[] getKeyIndex() {
+        return keyIndex;
+    }
+
+    public void setTableIdentifier(String tableIdentifier) {
+        this.tableIdentifier = tableIdentifier;
+    }
+
+    public void setSelectFields(String[] selectFields) {
+        this.selectFields = selectFields;
+    }
+
+    public void setConditionFields(String[] conditionFields) {
+        this.conditionFields = conditionFields;
+    }
+
+    public void setFieldTypes(DataType[] fieldTypes) {
+        this.fieldTypes = fieldTypes;
+    }
+
+    public void setKeyIndex(int[] keyIndex) {
+        this.keyIndex = keyIndex;
+    }
+
+    @Override
+    public String toString() {
+        return "LookupSchema{" +
+                "tableIdentifier='" + tableIdentifier + '\'' +
+                ", selectFields=" + Arrays.toString(selectFields) +
+                ", conditionFields=" + Arrays.toString(conditionFields) +
+                ", fieldTypes=" + Arrays.toString(fieldTypes) +
+                ", keyIndex=" + Arrays.toString(keyIndex) +
+                '}';
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Record.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Record.java
new file mode 100644
index 0000000..172d892
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Record.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * record
+ */
+public class Record implements Serializable {
+
+    LookupSchema schema;
+    Object[] values;
+
+    public Record(LookupSchema schema) {
+        this.schema = schema;
+        values = new Object[schema.getFieldTypes().length];
+    }
+
+    public LookupSchema getSchema() {
+        return schema;
+    }
+
+    public Object getObject(int index) {
+        return values[index];
+    }
+
+    public void setObject(int index, Object obj) {
+        values[index] = obj;
+    }
+
+    public String getTableIdentifier() {
+        return schema.getTableIdentifier();
+    }
+
+    public String[] getSelectFields() {
+        return schema.getSelectFields();
+    }
+
+    public String[] getConditionFields() {
+        return schema.getConditionFields();
+    }
+
+    public DataType[] getFieldTypes() {
+        return schema.getFieldTypes();
+    }
+
+    public int[] getKeyIndex() {
+        return schema.getKeyIndex();
+    }
+
+    public Object[] getValues() {
+        return values;
+    }
+
+    @Override
+    public String toString() {
+        return "Record{" +
+                "schema=" + schema +
+                ", values=" + Arrays.toString(values) +
+                '}';
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/RecordKey.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/RecordKey.java
new file mode 100644
index 0000000..805a03f
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/RecordKey.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import java.lang.reflect.Array;
+
+public class RecordKey {
+    Record record;
+    int[] keys;
+
+    public RecordKey(Record record) {
+        this.record = record;
+        keys = record.getKeyIndex();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        RecordKey recordKey = (RecordKey) o;
+        if (record == recordKey.record) {
+            return true;
+        }
+
+        if (record.getKeyIndex().length != recordKey.record.getKeyIndex().length) {
+            return false;
+        }
+        if (record.getKeyIndex().length == 0) {
+            return false;
+        }
+        for (int i : record.getKeyIndex()) {
+            Object left = record.getObject(i);
+            Object right = recordKey.record.getObject(i);
+            if (!equals(left, right)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        return hash(record, keys);
+    }
+
+    public static boolean equals(Object obj0, Object obj1) {
+        if (obj0 == null) {
+            if (obj1 != null) {
+                return false;
+            } else {
+                return true;
+            }
+        } else {
+            if (obj1 == null) {
+                return false;
+            }
+        }
+
+        if (obj0.getClass().isArray()) {
+            if (obj1.getClass().isArray()) {
+                int length0 = Array.getLength(obj0);
+                int length1 = Array.getLength(obj1);
+                if (length0 != length1) {
+                    return false;
+                } else {
+                    for (int i = 0; i < length0; ++i) {
+                        Object child0 = Array.get(obj0, i);
+                        Object child1 = Array.get(obj1, i);
+                        if (!equals(child0, child1)) {
+                            return false;
+                        }
+                    }
+                }
+            } else {
+                return false;
+            }
+        } else {
+            if (obj1.getClass().isArray()) {
+                return false;
+            } else {
+                return obj0.equals(obj1);
+            }
+        }
+        return true;
+    }
+
+    public static int hash(Record record, int[] indexes) {
+        int hash = 0;
+        boolean first = true;
+        for (int i : indexes) {
+            if (first) {
+                hash = hashCode(record.getObject(i));
+            } else {
+                hash ^= hashCode(record.getObject(i));
+            }
+            first = false;
+        }
+        return hash;
+    }
+
+
+    public static int hashCode(Object obj) {
+        if (obj == null) {
+            return 0;
+        }
+        if (obj.getClass().isArray()) {
+            int hash = 0;
+            int length = Array.getLength(obj);
+            for (int i = 0; i < length; ++i) {
+                Object child = Array.get(obj, i);
+                hash = hash * 31 + (child == null ? 0 : child.hashCode());
+            }
+            return hash;
+        } else {
+            return obj.hashCode();
+        }
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
new file mode 100644
index 0000000..e29bfd1
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.connection.JdbcConnectionProvider;
+import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class Worker implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
+    private final String name;
+    private final AtomicBoolean started;
+    private final JdbcConnectionProvider jdbcConnectionProvider;
+    private ArrayBlockingQueue<GetAction> queue = new ArrayBlockingQueue(1);
+    private final int maxRetryTimes;
+    private AtomicReference<Throwable> exception = new AtomicReference<>(null);
+
+    public Worker(AtomicBoolean started, DorisOptions options, DorisLookupOptions lookupOptions, int index) {
+        this.started = started;
+        this.name = "Worker-" + index;
+        this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(options);
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    public boolean offer(GetAction action) {
+        if (exception.get() != null) {
+            throw new DorisRuntimeException(exception.get());
+        }
+        return queue.offer(action);
+    }
+
+    @Override
+    public void run() {
+        LOG.info("worker:{} start", this);
+        while (started.get()) {
+            try {
+                GetAction action = queue.poll(2000L, TimeUnit.MILLISECONDS);
+                if (action != null) {
+                    try{
+                        handle(action);
+                    }finally {
+                        if (action.getSemaphore() != null) {
+                            action.getSemaphore().release();
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                LOG.error("worker running error", e);
+                exception.set(e);
+                break;
+            }
+        }
+        LOG.info("worker:{} stop", this);
+        jdbcConnectionProvider.closeConnection();
+    }
+
+    private void handle(GetAction action) {
+        if (action.getGetList().size() <= 0) {
+            return;
+        }
+        LookupSchema schema = action.getGetList().get(0).getRecord().getSchema();
+        List<Get> recordList = action.getGetList();
+
+        StringBuilder sb = new StringBuilder();
+        boolean first;
+        for (int i = 0; i < recordList.size(); i++) {
+            if (i > 0) {
+                sb.append(" union all ");
+            }
+            first = true;
+            appendSelect(sb, schema);
+            sb.append(" where ( ");
+            for (String condition : schema.getConditionFields()) {
+                if (!first) {
+                    sb.append(" and ");
+                }
+                first = false;
+                sb.append(quoteIdentifier(condition)).append("=?");
+            }
+            sb.append(" ) ");
+        }
+
+        String sql = sb.toString();
+        LOG.debug("query sql is {}", sql);
+        try {
+            Map<RecordKey, List<Record>> resultRecordMap = executeQuery(sql, recordList, schema);
+            for (Get get : recordList) {
+                Record record = get.getRecord();
+                if (get.getFuture() != null) {
+                    RecordKey key = new RecordKey(record);
+                    List<Record> result = resultRecordMap.get(key);
+                    get.getFuture().complete(result);
+                }
+            }
+        } catch (Exception e) {
+            for (Get get : recordList) {
+                if (get.getFuture() != null && !get.getFuture().isDone()) {
+                    get.getFuture().completeExceptionally(e);
+                }
+            }
+        }
+    }
+
+    private void appendSelect(StringBuilder sb, LookupSchema schema){
+        String[] selectFields = schema.getSelectFields();
+        sb.append("select ");
+        for (int i = 0; i < selectFields.length; i++) {
+            if (i > 0) {
+                sb.append(",");
+            }
+            String columnName = selectFields[i];
+            sb.append(quoteIdentifier(columnName));
+        }
+        sb.append(" from ").append(schema.getTableIdentifier());
+    }
+
+    private Map<RecordKey, List<Record>> executeQuery(String sql,
+                                                      List<Get> recordList,
+                                                      LookupSchema schema) {
+        Map<RecordKey, List<Record>> resultRecordMap = new HashMap<>();
+        //retry strategy
+        for (int retry = 0; retry <= maxRetryTimes; retry++) {
+            resultRecordMap = new HashMap<>();
+            try {
+                Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
+                try (PreparedStatement ps = conn.prepareStatement(sql)) {
+                    int paramIndex = 0;
+                    for (Get get : recordList) {
+                        Record record = get.getRecord();
+                        for (int keyIndex : schema.getKeyIndex()) {
+                            ps.setObject(++paramIndex, record.getObject(keyIndex));
+                        }
+                    }
+
+                    try (ResultSet rs = ps.executeQuery()) {
+                        while (rs.next()) {
+                            Record record = new Record(schema);
+                            for (int index = 0; index < schema.getFieldTypes().length; index++) {
+                                record.setObject(index, rs.getObject(index + 1));
+                            }
+                            List<Record> records = resultRecordMap.computeIfAbsent(new RecordKey(record), m -> new ArrayList<>());
+                            records.add(record);
+                        }
+                    }
+                }
+                return resultRecordMap;
+            } catch (Exception e) {
+                LOG.error(String.format("query doris error, retry times = %d", retry), e);
+                if (retry >= maxRetryTimes) {
+                    throw new RuntimeException(e);
+                }
+                try {
+                    Thread.sleep(1000 * retry);
+                } catch (InterruptedException ex) {
+                    throw new RuntimeException(ex);
+                }
+            }
+        }
+        return resultRecordMap;
+    }
+
+    public String quoteIdentifier(String identifier) {
+        return "`" + identifier + "`";
+    }
+
+    public String toString() {
+        return name;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 391df7d..a637967 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -42,6 +42,8 @@ public class DorisConfigOptions {
     public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the doris user name.");
     public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the doris password.");
 
+    public static final ConfigOption<String> JDBC_URL = ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris jdbc url address.");
+
     // source config options
     public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
             .key("doris.read.field")
@@ -123,9 +125,34 @@ public class DorisConfigOptions {
     public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
             ConfigOptions.key("lookup.max-retries")
                     .intType()
-                    .defaultValue(3)
+                    .defaultValue(1)
                     .withDescription("The max retry times if lookup database failed.");
 
+    public static final ConfigOption<Integer> LOOKUP_JDBC_READ_BATCH_SIZE =
+            ConfigOptions.key("lookup.jdbc.read.batch.size")
+                    .intType()
+                    .defaultValue(128)
+                    .withDescription("when dimension table query, save the maximum number of batches.");
+
+    public static final ConfigOption<Integer> LOOKUP_JDBC_READ_BATCH_QUEUE_SIZE =
+            ConfigOptions.key("lookup.jdbc.read.batch.queue-size")
+                    .intType()
+                    .defaultValue(256)
+                    .withDescription("dimension table query request buffer queue size.");
+
+    public static final ConfigOption<Integer> LOOKUP_JDBC_READ_THREAD_SIZE =
+            ConfigOptions.key("lookup.jdbc.read.thread-size")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("the number of threads for dimension table query, each query occupies a JDBC connection");
+
+    public static final ConfigOption<Boolean> LOOKUP_JDBC_ASYNC =
+            ConfigOptions.key("lookup.jdbc.async")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("whether to set async lookup");
+
+
     // sink config options
     public static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
             .key("sink.enable-2pc")
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index df7b867..81c756c 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -49,8 +49,13 @@ import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETR
 import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
 import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
 import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL;
 import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_MAX_ROWS;
 import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_TTL;
+import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_JDBC_ASYNC;
+import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_JDBC_READ_BATCH_QUEUE_SIZE;
+import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_JDBC_READ_BATCH_SIZE;
+import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_JDBC_READ_THREAD_SIZE;
 import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_MAX_RETRIES;
 import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT;
@@ -95,6 +100,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
         options.add(TABLE_IDENTIFIER);
         options.add(USERNAME);
         options.add(PASSWORD);
+        options.add(JDBC_URL);
 
         options.add(DORIS_READ_FIELD);
         options.add(DORIS_FILTER_QUERY);
@@ -110,6 +116,10 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
         options.add(LOOKUP_CACHE_MAX_ROWS);
         options.add(LOOKUP_CACHE_TTL);
         options.add(LOOKUP_MAX_RETRIES);
+        options.add(LOOKUP_JDBC_ASYNC);
+        options.add(LOOKUP_JDBC_READ_BATCH_SIZE);
+        options.add(LOOKUP_JDBC_READ_THREAD_SIZE);
+        options.add(LOOKUP_JDBC_READ_BATCH_QUEUE_SIZE);
 
         options.add(SINK_CHECK_INTERVAL);
         options.add(SINK_ENABLE_2PC);
@@ -148,6 +158,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
         final String fenodes = readableConfig.get(FENODES);
         final DorisOptions.Builder builder = DorisOptions.builder()
                 .setFenodes(fenodes)
+                .setJdbcUrl(readableConfig.get(JDBC_URL))
                 .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
 
         readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
@@ -204,6 +215,10 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
         builder.setCacheExpireMs(readableConfig.get(LOOKUP_CACHE_TTL).toMillis());
         builder.setCacheMaxSize(readableConfig.get(LOOKUP_CACHE_MAX_ROWS));
         builder.setMaxRetryTimes(readableConfig.get(LOOKUP_MAX_RETRIES));
+        builder.setJdbcReadBatchSize(readableConfig.get(LOOKUP_JDBC_READ_BATCH_SIZE));
+        builder.setJdbcReadBatchQueueSize(readableConfig.get(LOOKUP_JDBC_READ_BATCH_QUEUE_SIZE));
+        builder.setJdbcReadThreadSize(readableConfig.get(LOOKUP_JDBC_READ_THREAD_SIZE));
+        builder.setAsync(readableConfig.get(LOOKUP_JDBC_ASYNC));
         return builder.build();
     }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index c5e47fa..0fb096b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
@@ -126,19 +127,31 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab
     public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
         DataType physicalRowDataType = physicalSchema.toRowDataType();
         String[] keyNames = new String[context.getKeys().length];
+        int[] keyIndexs = new int[context.getKeys().length];
         for (int i = 0; i < keyNames.length; i++) {
             int[] innerKeyArr = context.getKeys()[i];
             keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
+            keyIndexs[i] = innerKeyArr[0];
+        }
+        if (lookupOptions.isAsync()) {
+            return AsyncTableFunctionProvider.of(
+                    new DorisRowDataAsyncLookupFunction(
+                            options,
+                            lookupOptions,
+                            DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+                            DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
+                            keyNames,
+                            keyIndexs));
+        } else {
+            return TableFunctionProvider.of(
+                    new DorisRowDataJdbcLookupFunction(
+                            options,
+                            lookupOptions,
+                            DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+                            DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
+                            keyNames,
+                            keyIndexs));
         }
-
-        return TableFunctionProvider.of(
-                new DorisRowDataLookupFunction(
-                        options,
-                        readOptions,
-                        lookupOptions,
-                        DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
-                        DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
-                        keyNames));
     }
 
     @Override
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
new file mode 100644
index 0000000..b3c5f43
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.lookup.DorisJdbcLookupReader;
+import org.apache.doris.flink.lookup.DorisLookupReader;
+import org.apache.doris.flink.lookup.LookupSchema;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class DorisRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+    private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataAsyncLookupFunction.class);
+    private final DorisOptions options;
+    private final DorisLookupOptions lookupOptions;
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private transient Cache<RowData, List<RowData>> cache;
+    private DorisLookupReader lookupReader;
+    private LookupSchema lookupSchema;
+
+    public DorisRowDataAsyncLookupFunction(DorisOptions options,
+                                          DorisLookupOptions lookupOptions,
+                                          String[] selectFields,
+                                          DataType[] fieldTypes,
+                                          String[] conditionFields,
+                                          int[] keyIndex) {
+        Preconditions.checkNotNull(options.getJdbcUrl(), "jdbc-url is required in jdbc mode lookup");
+        this.options = options;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.lookupOptions = lookupOptions;
+        this.lookupSchema = new LookupSchema(options.getTableIdentifier(), selectFields, fieldTypes, conditionFields, keyIndex);
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        this.cache = cacheMaxSize == -1 || cacheExpireMs == -1
+                ? null
+                : CacheBuilder.newBuilder()
+                .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                .maximumSize(cacheMaxSize)
+                .build();
+        this.lookupReader = new DorisJdbcLookupReader(options, lookupOptions, lookupSchema);
+    }
+
+    /**
+     * This is a lookup method which is called by Flink framework in runtime.
+     *
+     */
+    public void eval(CompletableFuture<Collection<RowData>> future, Object... keys) throws IOException {
+        RowData keyRow = GenericRowData.of(keys);
+        if (cache != null) {
+            List<RowData> cachedRows = cache.getIfPresent(keyRow);
+            if (cachedRows != null) {
+                future.complete(cachedRows);
+            }
+            return;
+        }
+        CompletableFuture<List<RowData>> resultFuture = lookupReader.asyncGet(keyRow);
+        resultFuture.handleAsync(
+                (resultRows, throwable) -> {
+                    try {
+                        if (throwable != null) {
+                            future.completeExceptionally(throwable);
+                        } else {
+                            if (resultRows == null || resultRows.isEmpty()) {
+                                if(cache != null){
+                                    cache.put(keyRow, Collections.emptyList());
+                                }
+                                future.complete(Collections.emptyList());
+                            } else {
+                                if(cache != null){
+                                    cache.put(keyRow, resultRows);
+                                }
+                                future.complete(resultRows);
+                            }
+                        }
+                    } catch (Throwable t) {
+                        future.completeExceptionally(t);
+                    }
+                    return null;
+                });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+    }
+
+    @VisibleForTesting
+    public Cache<RowData, List<RowData>> getCache() {
+        return cache;
+    }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
new file mode 100644
index 0000000..93a0059
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.lookup.DorisJdbcLookupReader;
+import org.apache.doris.flink.lookup.DorisLookupReader;
+import org.apache.doris.flink.lookup.LookupSchema;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * use jdbc to query
+ */
+public class DorisRowDataJdbcLookupFunction extends TableFunction<RowData> {
+    private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataJdbcLookupFunction.class);
+    private final DorisOptions options;
+    private final DorisLookupOptions lookupOptions;
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private transient Cache<RowData, List<RowData>> cache;
+    private DorisLookupReader lookupReader;
+    private LookupSchema lookupSchema;
+
+    public DorisRowDataJdbcLookupFunction(DorisOptions options,
+                                          DorisLookupOptions lookupOptions,
+                                          String[] selectFields,
+                                          DataType[] fieldTypes,
+                                          String[] conditionFields,
+                                          int[] keyIndex) {
+        Preconditions.checkNotNull(options.getJdbcUrl(), "jdbc-url is required in jdbc mode lookup");
+        this.options = options;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.lookupOptions = lookupOptions;
+        this.lookupSchema = new LookupSchema(options.getTableIdentifier(), selectFields, fieldTypes, conditionFields, keyIndex);
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        super.open(context);
+        this.cache = cacheMaxSize == -1 || cacheExpireMs == -1
+                ? null
+                : CacheBuilder.newBuilder()
+                .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                .maximumSize(cacheMaxSize)
+                .build();
+        this.lookupReader = new DorisJdbcLookupReader(options, lookupOptions, lookupSchema);
+    }
+
+    /**
+     * This is a lookup method which is called by Flink framework in runtime.
+     *
+     * @param keys lookup keys
+     */
+    public void eval(Object... keys) throws IOException {
+        RowData keyRow = GenericRowData.of(keys);
+        if (cache != null) {
+            List<RowData> cachedRows = cache.getIfPresent(keyRow);
+            if (cachedRows != null) {
+                for (RowData cachedRow : cachedRows) {
+                    collect(cachedRow);
+                }
+                return;
+            }
+        }
+        queryRecord(keyRow);
+    }
+
+    private void queryRecord(RowData keyRow) throws IOException {
+        List<RowData> rowData = lookupReader.get(keyRow);
+        if(rowData == null){
+            rowData = Collections.emptyList();
+        }
+        if(cache != null){
+            cache.put(keyRow, rowData);
+        }
+        rowData.forEach(this::collect);
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+    }
+
+    @VisibleForTesting
+    public Cache<RowData, List<RowData>> getCache() {
+        return cache;
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinExample.java
new file mode 100644
index 0000000..d3fbf87
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinExample.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.UUID;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+public class LookupJoinExample {
+
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.disableOperatorChaining();
+        env.enableCheckpointing(30000);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        DataStreamSource<Tuple2<Integer,String>> source = env.addSource(new SourceFunction<Tuple2<Integer,String>>() {
+            private Integer id = 1;
+            @Override
+            public void run(SourceContext<Tuple2<Integer,String>> out) throws Exception {
+                while(true){
+                    Tuple2<Integer,String> record = new Tuple2<>(id++, UUID.randomUUID().toString());
+                    out.collect(record);
+                    Thread.sleep(1000);
+                }
+            }
+            @Override
+            public void cancel() {
+            }
+        });
+        tEnv.createTemporaryView("doris_source",source,$("id"),$("uuid"),$("process_time").proctime());
+
+        tEnv.executeSql(
+                "CREATE TABLE lookup_dim_tbl (" +
+                        "  c_custkey int," +
+                        "  c_name string," +
+                        "  c_address string," +
+                        "  c_city string," +
+                        "  c_nation string," +
+                        "  c_region string," +
+                        "  c_phone string," +
+                        "  c_mktsegment string" +
+                        ") " +
+                        "WITH (\n" +
+                        "  'connector' = 'doris',\n" +
+                        "  'fenodes' = '127.0.0.1:8030',\n" +
+                        "  'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',\n" +
+                        "  'table.identifier' = 'ssb.customer',\n" +
+                        "  'lookup.jdbc.async' = 'true',\n" +
+                        "  'username' = 'root',\n" +
+                        "  'password' = ''\n" +
+                        ")");
+
+
+        Table table = tEnv.sqlQuery("select a.id,a.uuid,b.c_name,b.c_nation,b.c_phone  from doris_source a " +
+                "left join lookup_dim_tbl FOR SYSTEM_TIME AS OF a.process_time b " +
+                "ON  a.id = b.c_custkey");
+
+        tEnv.toRetractStream(table, Row.class).print();
+        env.execute();
+    }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/RecordTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/RecordTest.java
new file mode 100644
index 0000000..c6dc297
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/RecordTest.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RecordTest {
+
+    private LookupSchema schema;
+
+    @Before
+    public void before(){
+        String tableIdentifier = "db.tbl";
+        String[] selectFields = new String[]{"a","b","c"};
+        DataType[] fieldTypes = new DataType[]{DataTypes.INT(),DataTypes.STRING(),DataTypes.DOUBLE()};
+        schema = new LookupSchema(tableIdentifier,selectFields,fieldTypes,null,null);
+    }
+
+    @Test
+    public void testLookupOneKey(){
+        String[] conditionFields = new String[]{"a"};
+        int[] keyIndex = new int[]{1};
+        Object[] values = new Object[schema.getFieldTypes().length];
+        values[0] = 1001;
+        Record record = appendValues(conditionFields, keyIndex, values);
+
+        Map<RecordKey, Record> map = new HashMap<>();
+        map.put(new RecordKey(record), record);
+        Assert.assertTrue(map.get(new RecordKey(record)) != null);
+    }
+
+    @Test
+    public void testLookupTwoKey(){
+        String[] conditionFields = new String[]{"a","b"};
+        int[] keyIndex = new int[]{1,2};
+        Object[] values = new Object[schema.getFieldTypes().length];
+        values[0] = 1001;
+        values[1] = "doris";
+        Record record = appendValues(conditionFields, keyIndex, values);
+
+        Map<RecordKey, Record> map = new HashMap<>();
+        map.put(new RecordKey(record), record);
+        Assert.assertTrue(map.get(new RecordKey(record)) != null);
+    }
+
+    @Test
+    public void testLookupOnlyTwoKey(){
+        String[] conditionFields = new String[]{"b"};
+        int[] keyIndex = new int[]{2};
+        Object[] values = new Object[schema.getFieldTypes().length];
+        values[0] = "doris";
+        Record record = appendValues(conditionFields, keyIndex, values);
+
+        Map<RecordKey, Record> map = new HashMap<>();
+        map.put(new RecordKey(record), record);
+        Assert.assertTrue(map.get(new RecordKey(record)) != null);
+    }
+
+    private Record appendValues(String[] conditionFields, int[] keyIndex, Object[] values){
+        schema.setKeyIndex(keyIndex);
+        schema.setConditionFields(conditionFields);
+        Record record = new Record(schema);
+        for(int i=0;i<schema.getFieldTypes().length;i++){
+            record.setObject(i,values);
+        }
+        return record;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org