You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2023/01/17 01:45:24 UTC
[doris-flink-connector] branch master updated: [improve] improve lookup join to async and batch (#97)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 522bbf4 [improve] improve lookup join to async and batch (#97)
522bbf4 is described below
commit 522bbf41b29c88911373660a674e183a059714a3
Author: wudi <67...@qq.com>
AuthorDate: Tue Jan 17 09:45:19 2023 +0800
[improve] improve lookup join to async and batch (#97)
* imporve lookup join
---
.../doris/flink/cfg/DorisConnectionOptions.java | 19 +-
.../apache/doris/flink/cfg/DorisLookupOptions.java | 68 ++++++-
.../org/apache/doris/flink/cfg/DorisOptions.java | 16 +-
.../flink/connection/JdbcConnectionProvider.java | 33 ++++
.../connection/SimpleJdbcConnectionProvider.java | 74 ++++++++
.../converter/DorisRowConverter.java | 4 +-
.../doris/flink/lookup/DorisJdbcLookupReader.java | 132 +++++++++++++
.../doris/flink/lookup/DorisLookupReader.java | 31 +++
.../apache/doris/flink/lookup/ExecutionPool.java | 209 +++++++++++++++++++++
.../java/org/apache/doris/flink/lookup/Get.java | 42 +++++
.../org/apache/doris/flink/lookup/GetAction.java | 44 +++++
.../apache/doris/flink/lookup/LookupSchema.java | 91 +++++++++
.../java/org/apache/doris/flink/lookup/Record.java | 81 ++++++++
.../org/apache/doris/flink/lookup/RecordKey.java | 138 ++++++++++++++
.../java/org/apache/doris/flink/lookup/Worker.java | 200 ++++++++++++++++++++
.../doris/flink/table/DorisConfigOptions.java | 29 ++-
.../flink/table/DorisDynamicTableFactory.java | 15 ++
.../doris/flink/table/DorisDynamicTableSource.java | 31 ++-
.../table/DorisRowDataAsyncLookupFunction.java | 128 +++++++++++++
.../table/DorisRowDataJdbcLookupFunction.java | 120 ++++++++++++
.../doris/flink/lookup/LookupJoinExample.java | 86 +++++++++
.../org/apache/doris/flink/lookup/RecordTest.java | 90 +++++++++
22 files changed, 1664 insertions(+), 17 deletions(-)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
index 9b2187c..00abd52 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
@@ -30,6 +30,7 @@ public class DorisConnectionOptions implements Serializable {
protected final String fenodes;
protected final String username;
protected final String password;
+ protected String jdbcUrl;
public DorisConnectionOptions(String fenodes, String username, String password) {
this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes is empty");
@@ -37,6 +38,11 @@ public class DorisConnectionOptions implements Serializable {
this.password = password;
}
+ public DorisConnectionOptions(String fenodes, String username, String password, String jdbcUrl){
+ this(fenodes,username,password);
+ this.jdbcUrl = jdbcUrl;
+ }
+
public String getFenodes() {
return fenodes;
}
@@ -49,6 +55,10 @@ public class DorisConnectionOptions implements Serializable {
return password;
}
+ public String getJdbcUrl(){
+ return jdbcUrl;
+ }
+
/**
* Builder for {@link DorisConnectionOptions}.
*/
@@ -57,6 +67,8 @@ public class DorisConnectionOptions implements Serializable {
private String username;
private String password;
+ private String jdbcUrl;
+
public DorisConnectionOptionsBuilder withFenodes(String fenodes) {
this.fenodes = fenodes;
return this;
@@ -72,8 +84,13 @@ public class DorisConnectionOptions implements Serializable {
return this;
}
+ public DorisConnectionOptionsBuilder withJdbcUrl(String jdbcUrl) {
+ this.jdbcUrl = jdbcUrl;
+ return this;
+ }
+
public DorisConnectionOptions build() {
- return new DorisConnectionOptions(fenodes, username, password);
+ return new DorisConnectionOptions(fenodes, username, password, jdbcUrl);
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
index f0f7eb4..a1d8f30 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisLookupOptions.java
@@ -24,12 +24,27 @@ public class DorisLookupOptions implements Serializable {
private final long cacheMaxSize;
private final long cacheExpireMs;
private final int maxRetryTimes;
+ private final int jdbcReadBatchSize;
+ private final int jdbcReadBatchQueueSize;
+ private final int jdbcReadThreadSize;
+
+ private final boolean async;
public DorisLookupOptions(
- long cacheMaxSize, long cacheExpireMs, int maxRetryTimes) {
+ long cacheMaxSize,
+ long cacheExpireMs,
+ int maxRetryTimes,
+ int jdbcReadBatchSize,
+ int jdbcReadBatchQueueSize,
+ int jdbcReadThreadSize,
+ boolean async) {
this.cacheMaxSize = cacheMaxSize;
this.cacheExpireMs = cacheExpireMs;
this.maxRetryTimes = maxRetryTimes;
+ this.jdbcReadBatchSize = jdbcReadBatchSize;
+ this.jdbcReadBatchQueueSize = jdbcReadBatchQueueSize;
+ this.jdbcReadThreadSize = jdbcReadThreadSize;
+ this.async = async;
}
public long getCacheMaxSize() {
@@ -44,6 +59,22 @@ public class DorisLookupOptions implements Serializable {
return maxRetryTimes;
}
+ public int getJdbcReadBatchSize() {
+ return jdbcReadBatchSize;
+ }
+
+ public int getJdbcReadBatchQueueSize() {
+ return jdbcReadBatchQueueSize;
+ }
+
+ public int getJdbcReadThreadSize() {
+ return jdbcReadThreadSize;
+ }
+
+ public boolean isAsync() {
+ return async;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -52,7 +83,11 @@ public class DorisLookupOptions implements Serializable {
public static class Builder {
private long cacheMaxSize = -1L;
private long cacheExpireMs = -1L;
- private int maxRetryTimes = 3;
+ private int maxRetryTimes = 1;
+ private int jdbcReadBatchSize;
+ private int jdbcReadBatchQueueSize;
+ private int jdbcReadThreadSize;
+ private boolean async;
/** optional, lookup cache max size, over this value, the old data will be eliminated. */
public Builder setCacheMaxSize(long cacheMaxSize) {
@@ -72,9 +107,36 @@ public class DorisLookupOptions implements Serializable {
return this;
}
+ public Builder setJdbcReadBatchSize(int jdbcReadBatchSize){
+ this.jdbcReadBatchSize = jdbcReadBatchSize;
+ return this;
+ }
+
+ public Builder setJdbcReadBatchQueueSize(int jdbcReadBatchQueueSize){
+ this.jdbcReadBatchQueueSize = jdbcReadBatchQueueSize;
+ return this;
+ }
+
+ public Builder setJdbcReadThreadSize(int jdbcReadThreadSize){
+ this.jdbcReadThreadSize = jdbcReadThreadSize;
+ return this;
+ }
+
+ public Builder setAsync(boolean async){
+ this.async = async;
+ return this;
+ }
+
+
public DorisLookupOptions build() {
return new DorisLookupOptions(
- cacheMaxSize, cacheExpireMs, maxRetryTimes);
+ cacheMaxSize,
+ cacheExpireMs,
+ maxRetryTimes,
+ jdbcReadBatchSize,
+ jdbcReadBatchQueueSize,
+ jdbcReadThreadSize,
+ async);
}
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index 7d6963b..c9e36e3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -37,6 +37,11 @@ public class DorisOptions extends DorisConnectionOptions {
this.tableIdentifier = tableIdentifier;
}
+ public DorisOptions(String fenodes, String username, String password, String tableIdentifier, String jdbcUrl) {
+ super(fenodes, username, password, jdbcUrl);
+ this.tableIdentifier = tableIdentifier;
+ }
+
public String getTableIdentifier() {
return tableIdentifier;
}
@@ -55,6 +60,8 @@ public class DorisOptions extends DorisConnectionOptions {
*/
public static class Builder {
private String fenodes;
+
+ private String jdbcUrl;
private String username;
private String password;
private String tableIdentifier;
@@ -91,11 +98,18 @@ public class DorisOptions extends DorisConnectionOptions {
return this;
}
+ /**
+ * not required, fe jdbc url, for lookup query
+ */
+ public Builder setJdbcUrl(String jdbcUrl) {
+ this.jdbcUrl = jdbcUrl;
+ return this;
+ }
public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
- return new DorisOptions(fenodes, username, password, tableIdentifier);
+ return new DorisOptions(fenodes, username, password, tableIdentifier, jdbcUrl);
}
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/JdbcConnectionProvider.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/JdbcConnectionProvider.java
new file mode 100644
index 0000000..3e840c0
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/JdbcConnectionProvider.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.connection;
+
+import java.sql.Connection;
+
+public interface JdbcConnectionProvider {
+
+ /**
+ * Get existing connection or establish an new one if there is none.
+ */
+ Connection getOrEstablishConnection() throws Exception;
+
+
+ /** Close possible existing connection. */
+ void closeConnection();
+
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
new file mode 100644
index 0000000..4d42cd6
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.connection;
+
+import org.apache.doris.flink.cfg.DorisConnectionOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Objects;
+
+public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
+ private static final long serialVersionUID = 1L;
+
+ private final DorisConnectionOptions options;
+
+ private transient Connection connection;
+
+ public SimpleJdbcConnectionProvider(DorisConnectionOptions options){
+ this.options = options;
+ }
+
+ @Override
+ public Connection getOrEstablishConnection() throws ClassNotFoundException, SQLException {
+ if (connection != null && !connection.isClosed()) {
+ return connection;
+ }
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ } catch (ClassNotFoundException ex) {
+ LOG.warn("can not found class com.mysql.cj.jdbc.Driver, use class com.mysql.jdbc.Driver");
+ Class.forName("com.mysql.jdbc.Driver");
+ }
+ if (!Objects.isNull(options.getUsername())) {
+ connection = DriverManager.getConnection(options.getJdbcUrl(), options.getUsername(), options.getPassword());
+ } else {
+ connection = DriverManager.getConnection(options.getJdbcUrl());
+ }
+ return connection;
+ }
+
+ @Override
+ public void closeConnection() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.warn("JDBC connection close failed.", e);
+ } finally {
+ connection = null;
+ }
+ }
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index 79c3b81..435c0f3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -200,7 +200,7 @@ public class DorisRowConverter implements Serializable {
return ((index, val) -> null);
case CHAR:
case VARCHAR:
- return (index, val) -> val.getString(index);
+ return (index, val) -> val.getString(index).toString();
case BOOLEAN:
return (index, val) -> val.getBoolean(index);
case BINARY:
@@ -209,7 +209,7 @@ public class DorisRowConverter implements Serializable {
case DECIMAL:
final int decimalPrecision = ((DecimalType) type).getPrecision();
final int decimalScale = ((DecimalType) type).getScale();
- return (index, val) -> val.getDecimal(index, decimalPrecision, decimalScale);
+ return (index, val) -> val.getDecimal(index, decimalPrecision, decimalScale).toBigDecimal();
case TINYINT:
return (index, val) -> val.getByte(index);
case SMALLINT:
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
new file mode 100644
index 0000000..9090783
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public class DorisJdbcLookupReader extends DorisLookupReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DorisJdbcLookupReader.class);
+
+ private ExecutionPool pool;
+
+ private DorisRowConverter converter;
+
+ private DorisRowConverter keyConverter;
+
+ private LookupSchema schema;
+
+ public DorisJdbcLookupReader(DorisOptions options, DorisLookupOptions lookupOptions, LookupSchema lookupSchema) {
+ this.converter = new DorisRowConverter(lookupSchema.getFieldTypes());
+ this.pool = new ExecutionPool(options, lookupOptions);
+ this.schema = lookupSchema;
+ this.keyConverter = buildKeyConvert();
+ }
+
+ private DorisRowConverter buildKeyConvert() {
+ int[] keyIndex = schema.getKeyIndex();
+ DataType[] keyTypes = new DataType[keyIndex.length];
+ DataType[] fieldTypes = schema.getFieldTypes();
+ for (int i = 0; i < keyIndex.length; i++) {
+ keyTypes[i] = fieldTypes[keyIndex[i]];
+ }
+ return new DorisRowConverter(keyTypes);
+ }
+
+ @Override
+ public CompletableFuture<List<RowData>> asyncGet(RowData recordIn) throws IOException {
+ CompletableFuture<List<RowData>> result = new CompletableFuture<>();
+ Record record = convertRecord(recordIn);
+ try {
+ pool.get(new Get(record))
+ .handleAsync((resultRow, throwable) -> {
+ try {
+ if (throwable != null) {
+ result.completeExceptionally(throwable);
+ } else {
+ if (resultRow == null) {
+ result.complete(new ArrayList<>());
+ } else {
+ //convert Record to RowData
+ List<RowData> rowDatas = convertRowDataList(resultRow);
+ result.complete(rowDatas);
+ }
+ }
+ } catch (Throwable e) {
+ result.completeExceptionally(e);
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ }
+ return result;
+ }
+
+ private List<RowData> convertRowDataList(List<Record> records) {
+ List<RowData> results = new ArrayList<>();
+ for (Record record : records) {
+ RowData rowData = convertRowData(record);
+ results.add(rowData);
+ }
+ return results;
+ }
+
+ private RowData convertRowData(Record record) {
+ if (record == null) {
+ return null;
+ }
+ Object[] values = record.getValues();
+ GenericRowData rowData = converter.convertInternal(Arrays.asList(values));
+ return rowData;
+ }
+
+ private Record convertRecord(RowData recordIn) {
+ Record record = new Record(schema);
+ int[] keyIndex = schema.getKeyIndex();
+
+ for (int index = 0; index < keyIndex.length; index++) {
+ Object value = keyConverter.convertExternal(recordIn, index);
+ record.setObject(keyIndex[index], value);
+ }
+ return record;
+ }
+
+ @Override
+ public List<RowData> get(RowData record) throws IOException {
+ try {
+ return this.asyncGet(record).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
new file mode 100644
index 0000000..b6c5073
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public abstract class DorisLookupReader {
+
+ public abstract CompletableFuture<List<RowData>> asyncGet(RowData record) throws IOException;
+
+ public abstract List<RowData> get(RowData record) throws IOException;
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
new file mode 100644
index 0000000..c9638f5
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ExecutionPool implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutionPool.class);
+ private ActionWatcher readActionWatcher;
+ final ArrayBlockingQueue<Get> queue;
+ private AtomicBoolean started; //determine whether the executionPool is running
+ private AtomicBoolean workerStated; //determine whether the worker is running
+ ExecutorService actionWatcherExecutorService;
+ ExecutorService workerExecutorService;
+ ThreadFactory workerThreadFactory;
+ ThreadFactory actionWatcherThreadFactory;
+ private Worker[] workers;
+
+ private Semaphore semaphore;
+
+ private final int jdbcReadThreadSize;
+
+ public ExecutionPool(DorisOptions options, DorisLookupOptions lookupOptions) {
+ started = new AtomicBoolean(false);
+ workerStated = new AtomicBoolean(false);
+ workerThreadFactory = new DefaultThreadFactory("worker");
+ actionWatcherThreadFactory = new DefaultThreadFactory("action-watcher");
+ this.queue = new ArrayBlockingQueue<>(lookupOptions.getJdbcReadBatchQueueSize());
+ this.readActionWatcher = new ActionWatcher(lookupOptions.getJdbcReadBatchSize());
+ this.workers = new Worker[lookupOptions.getJdbcReadThreadSize()];
+ for (int i = 0; i < workers.length; ++i) {
+ workers[i] = new Worker(workerStated, options, lookupOptions, i);
+ }
+ this.jdbcReadThreadSize = lookupOptions.getJdbcReadThreadSize();
+ start();
+ }
+
+ private void start() {
+ if (started.compareAndSet(false, true)) {
+ workerStated.set(true);
+ workerExecutorService = new ThreadPoolExecutor(workers.length, workers.length, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), workerThreadFactory, new ThreadPoolExecutor.AbortPolicy());
+ actionWatcherExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), actionWatcherThreadFactory, new ThreadPoolExecutor.AbortPolicy());
+ for (int i = 0; i < workers.length; ++i) {
+ workerExecutorService.execute(workers[i]);
+ }
+ actionWatcherExecutorService.execute(readActionWatcher);
+ this.semaphore = new Semaphore(jdbcReadThreadSize);
+ }
+ }
+
+ public CompletableFuture<List<Record>> get(Get get) {
+ appendGet(get);
+ return get.getFuture();
+ }
+
+ private void appendGet(Get get) {
+ get.setFuture(new CompletableFuture<>());
+ try {
+ if (!queue.offer(get, 10000L, TimeUnit.MILLISECONDS)) {
+ get.getFuture().completeExceptionally(new TimeoutException());
+ }
+ } catch (InterruptedException e) {
+ get.getFuture().completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (started.compareAndSet(true, false)) {
+ actionWatcherExecutorService.shutdown();
+ workerStated.set(false);
+ workerExecutorService.shutdown();
+ this.semaphore = null;
+ }
+ }
+
+ public boolean submit(GetAction action) {
+ //if has semaphore, try to obtain the semaphore, otherwise return submit failure
+ if (semaphore != null) {
+ try {
+ boolean acquire = semaphore.tryAcquire(2000L, TimeUnit.MILLISECONDS);
+ if (!acquire) {
+ return false;
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("get semaphore be interrupt");
+ }
+ action.setSemaphore(semaphore);
+ }
+
+ //try to submit to worker
+ for (int i = 0; i < workers.length; ++i) {
+ Worker worker = workers[i];
+ if (worker.offer(action)) {
+ return true;
+ }
+ }
+ //If submit fails, it will be released, and if successful, the worker will be responsible for the release
+ if (semaphore != null) {
+ semaphore.release();
+ }
+ return false;
+ }
+
+ /**
+ * monitor the query action
+ * and submit it to the worker as soon as the data arrives
+ */
+ class ActionWatcher implements Runnable {
+ private int batchSize;
+
+ public ActionWatcher(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("action watcher start");
+ List<Get> recordList = new ArrayList<>(batchSize);
+ while (started.get()) {
+ try {
+ recordList.clear();
+ Get firstGet = queue.poll(2, TimeUnit.SECONDS);
+ if (firstGet != null) {
+ recordList.add(firstGet);
+ queue.drainTo(recordList, batchSize - 1);
+ Map<String, List<Get>> getsByTable = new HashMap<>();
+ for (Get get : recordList) {
+ List<Get> list = getsByTable.computeIfAbsent(get.getRecord().getTableIdentifier(), (s) -> new ArrayList<>());
+ list.add(get);
+ }
+ for (Map.Entry<String, List<Get>> entry : getsByTable.entrySet()) {
+ GetAction getAction = new GetAction(entry.getValue());
+ while (!submit(getAction)) {
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ break;
+ } catch (Exception e) {
+ for (Get get : recordList) {
+ if (!get.getFuture().isDone()) {
+ get.getFuture().completeExceptionally(e);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ActionWatcher{" +
+ "batchSize=" + batchSize +
+ '}';
+ }
+ }
+
+ static class DefaultThreadFactory implements ThreadFactory {
+ private static final AtomicInteger poolNumber = new AtomicInteger(1);
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final String namePrefix;
+
+ DefaultThreadFactory(String name) {
+ namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + name + "-";
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
+ t.setDaemon(false);
+ return t;
+ }
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Get.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Get.java
new file mode 100644
index 0000000..ba3efcc
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Get.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class Get {
+ Record record;
+ CompletableFuture<List<Record>> future;
+
+ public Get(Record record) {
+ this.record = record;
+ }
+
+ public Record getRecord() {
+ return record;
+ }
+
+ public CompletableFuture<List<Record>> getFuture() {
+ return this.future;
+ }
+
+ public void setFuture(CompletableFuture<List<Record>> future) {
+ this.future = future;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/GetAction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/GetAction.java
new file mode 100644
index 0000000..db59ec7
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/GetAction.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+public class GetAction {
+
+ List<Get> getList;
+
+ Semaphore semaphore;
+
+ public GetAction(List<Get> getList) {
+ this.getList = getList;
+ }
+
+ public List<Get> getGetList() {
+ return getList;
+ }
+
+ public Semaphore getSemaphore() {
+ return semaphore;
+ }
+
+ public void setSemaphore(Semaphore semaphore) {
+ this.semaphore = semaphore;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupSchema.java
new file mode 100644
index 0000000..698427f
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/LookupSchema.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class LookupSchema implements Serializable {
+
+ private String tableIdentifier;
+ private String[] selectFields;
+ private String[] conditionFields;
+ private DataType[] fieldTypes;
+ private int[] keyIndex;
+
+ public LookupSchema(String tableIdentifier, String[] selectFields, DataType[] fieldTypes, String[] conditionFields, int[] keyIndex) {
+ this.tableIdentifier = tableIdentifier;
+ this.selectFields = selectFields;
+ this.fieldTypes = fieldTypes;
+ this.conditionFields = conditionFields;
+ this.keyIndex = keyIndex;
+ }
+
+ public String getTableIdentifier() {
+ return tableIdentifier;
+ }
+
+ public String[] getSelectFields() {
+ return selectFields;
+ }
+
+ public String[] getConditionFields() {
+ return conditionFields;
+ }
+
+ public DataType[] getFieldTypes() {
+ return fieldTypes;
+ }
+
+ public int[] getKeyIndex() {
+ return keyIndex;
+ }
+
+ public void setTableIdentifier(String tableIdentifier) {
+ this.tableIdentifier = tableIdentifier;
+ }
+
+ public void setSelectFields(String[] selectFields) {
+ this.selectFields = selectFields;
+ }
+
+ public void setConditionFields(String[] conditionFields) {
+ this.conditionFields = conditionFields;
+ }
+
+ public void setFieldTypes(DataType[] fieldTypes) {
+ this.fieldTypes = fieldTypes;
+ }
+
+ public void setKeyIndex(int[] keyIndex) {
+ this.keyIndex = keyIndex;
+ }
+
+ @Override
+ public String toString() {
+ return "LookupSchema{" +
+ "tableIdentifier='" + tableIdentifier + '\'' +
+ ", selectFields=" + Arrays.toString(selectFields) +
+ ", conditionFields=" + Arrays.toString(conditionFields) +
+ ", fieldTypes=" + Arrays.toString(fieldTypes) +
+ ", keyIndex=" + Arrays.toString(keyIndex) +
+ '}';
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Record.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Record.java
new file mode 100644
index 0000000..172d892
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Record.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.table.types.DataType;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * record
+ */
+public class Record implements Serializable {
+
+ LookupSchema schema;
+ Object[] values;
+
+ public Record(LookupSchema schema) {
+ this.schema = schema;
+ values = new Object[schema.getFieldTypes().length];
+ }
+
+ public LookupSchema getSchema() {
+ return schema;
+ }
+
+ public Object getObject(int index) {
+ return values[index];
+ }
+
+ public void setObject(int index, Object obj) {
+ values[index] = obj;
+ }
+
+ public String getTableIdentifier() {
+ return schema.getTableIdentifier();
+ }
+
+ public String[] getSelectFields() {
+ return schema.getSelectFields();
+ }
+
+ public String[] getConditionFields() {
+ return schema.getConditionFields();
+ }
+
+ public DataType[] getFieldTypes() {
+ return schema.getFieldTypes();
+ }
+
+ public int[] getKeyIndex() {
+ return schema.getKeyIndex();
+ }
+
+ public Object[] getValues() {
+ return values;
+ }
+
+ @Override
+ public String toString() {
+ return "Record{" +
+ "schema=" + schema +
+ ", values=" + Arrays.toString(values) +
+ '}';
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/RecordKey.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/RecordKey.java
new file mode 100644
index 0000000..805a03f
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/RecordKey.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import java.lang.reflect.Array;
+
+public class RecordKey {
+ Record record;
+ int[] keys;
+
+ public RecordKey(Record record) {
+ this.record = record;
+ keys = record.getKeyIndex();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RecordKey recordKey = (RecordKey) o;
+ if (record == recordKey.record) {
+ return true;
+ }
+
+ if (record.getKeyIndex().length != recordKey.record.getKeyIndex().length) {
+ return false;
+ }
+ if (record.getKeyIndex().length == 0) {
+ return false;
+ }
+ for (int i : record.getKeyIndex()) {
+ Object left = record.getObject(i);
+ Object right = recordKey.record.getObject(i);
+ if (!equals(left, right)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return hash(record, keys);
+ }
+
+ public static boolean equals(Object obj0, Object obj1) {
+ if (obj0 == null) {
+ if (obj1 != null) {
+ return false;
+ } else {
+ return true;
+ }
+ } else {
+ if (obj1 == null) {
+ return false;
+ }
+ }
+
+ if (obj0.getClass().isArray()) {
+ if (obj1.getClass().isArray()) {
+ int length0 = Array.getLength(obj0);
+ int length1 = Array.getLength(obj1);
+ if (length0 != length1) {
+ return false;
+ } else {
+ for (int i = 0; i < length0; ++i) {
+ Object child0 = Array.get(obj0, i);
+ Object child1 = Array.get(obj1, i);
+ if (!equals(child0, child1)) {
+ return false;
+ }
+ }
+ }
+ } else {
+ return false;
+ }
+ } else {
+ if (obj1.getClass().isArray()) {
+ return false;
+ } else {
+ return obj0.equals(obj1);
+ }
+ }
+ return true;
+ }
+
+ public static int hash(Record record, int[] indexes) {
+ int hash = 0;
+ boolean first = true;
+ for (int i : indexes) {
+ if (first) {
+ hash = hashCode(record.getObject(i));
+ } else {
+ hash ^= hashCode(record.getObject(i));
+ }
+ first = false;
+ }
+ return hash;
+ }
+
+
+ public static int hashCode(Object obj) {
+ if (obj == null) {
+ return 0;
+ }
+ if (obj.getClass().isArray()) {
+ int hash = 0;
+ int length = Array.getLength(obj);
+ for (int i = 0; i < length; ++i) {
+ Object child = Array.get(obj, i);
+ hash = hash * 31 + (child == null ? 0 : child.hashCode());
+ }
+ return hash;
+ } else {
+ return obj.hashCode();
+ }
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
new file mode 100644
index 0000000..e29bfd1
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.connection.JdbcConnectionProvider;
+import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class Worker implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
+ private final String name;
+ private final AtomicBoolean started;
+ private final JdbcConnectionProvider jdbcConnectionProvider;
+ private ArrayBlockingQueue<GetAction> queue = new ArrayBlockingQueue(1);
+ private final int maxRetryTimes;
+ private AtomicReference<Throwable> exception = new AtomicReference<>(null);
+
+ public Worker(AtomicBoolean started, DorisOptions options, DorisLookupOptions lookupOptions, int index) {
+ this.started = started;
+ this.name = "Worker-" + index;
+ this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(options);
+ this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+ }
+
+ public boolean offer(GetAction action) {
+ if (exception.get() != null) {
+ throw new DorisRuntimeException(exception.get());
+ }
+ return queue.offer(action);
+ }
+
+ @Override
+ public void run() {
+ LOG.info("worker:{} start", this);
+ while (started.get()) {
+ try {
+ GetAction action = queue.poll(2000L, TimeUnit.MILLISECONDS);
+ if (action != null) {
+ try{
+ handle(action);
+ }finally {
+ if (action.getSemaphore() != null) {
+ action.getSemaphore().release();
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("worker running error", e);
+ exception.set(e);
+ break;
+ }
+ }
+ LOG.info("worker:{} stop", this);
+ jdbcConnectionProvider.closeConnection();
+ }
+
+ private void handle(GetAction action) {
+ if (action.getGetList().size() <= 0) {
+ return;
+ }
+ LookupSchema schema = action.getGetList().get(0).getRecord().getSchema();
+ List<Get> recordList = action.getGetList();
+
+ StringBuilder sb = new StringBuilder();
+ boolean first;
+ for (int i = 0; i < recordList.size(); i++) {
+ if (i > 0) {
+ sb.append(" union all ");
+ }
+ first = true;
+ appendSelect(sb, schema);
+ sb.append(" where ( ");
+ for (String condition : schema.getConditionFields()) {
+ if (!first) {
+ sb.append(" and ");
+ }
+ first = false;
+ sb.append(quoteIdentifier(condition)).append("=?");
+ }
+ sb.append(" ) ");
+ }
+
+ String sql = sb.toString();
+ LOG.debug("query sql is {}", sql);
+ try {
+ Map<RecordKey, List<Record>> resultRecordMap = executeQuery(sql, recordList, schema);
+ for (Get get : recordList) {
+ Record record = get.getRecord();
+ if (get.getFuture() != null) {
+ RecordKey key = new RecordKey(record);
+ List<Record> result = resultRecordMap.get(key);
+ get.getFuture().complete(result);
+ }
+ }
+ } catch (Exception e) {
+ for (Get get : recordList) {
+ if (get.getFuture() != null && !get.getFuture().isDone()) {
+ get.getFuture().completeExceptionally(e);
+ }
+ }
+ }
+ }
+
+ private void appendSelect(StringBuilder sb, LookupSchema schema){
+ String[] selectFields = schema.getSelectFields();
+ sb.append("select ");
+ for (int i = 0; i < selectFields.length; i++) {
+ if (i > 0) {
+ sb.append(",");
+ }
+ String columnName = selectFields[i];
+ sb.append(quoteIdentifier(columnName));
+ }
+ sb.append(" from ").append(schema.getTableIdentifier());
+ }
+
+ private Map<RecordKey, List<Record>> executeQuery(String sql,
+ List<Get> recordList,
+ LookupSchema schema) {
+ Map<RecordKey, List<Record>> resultRecordMap = new HashMap<>();
+ //retry strategy
+ for (int retry = 0; retry <= maxRetryTimes; retry++) {
+ resultRecordMap = new HashMap<>();
+ try {
+ Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
+ try (PreparedStatement ps = conn.prepareStatement(sql)) {
+ int paramIndex = 0;
+ for (Get get : recordList) {
+ Record record = get.getRecord();
+ for (int keyIndex : schema.getKeyIndex()) {
+ ps.setObject(++paramIndex, record.getObject(keyIndex));
+ }
+ }
+
+ try (ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ Record record = new Record(schema);
+ for (int index = 0; index < schema.getFieldTypes().length; index++) {
+ record.setObject(index, rs.getObject(index + 1));
+ }
+ List<Record> records = resultRecordMap.computeIfAbsent(new RecordKey(record), m -> new ArrayList<>());
+ records.add(record);
+ }
+ }
+ }
+ return resultRecordMap;
+ } catch (Exception e) {
+ LOG.error(String.format("query doris error, retry times = %d", retry), e);
+ if (retry >= maxRetryTimes) {
+ throw new RuntimeException(e);
+ }
+ try {
+ Thread.sleep(1000 * retry);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ return resultRecordMap;
+ }
+
+ public String quoteIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
+ public String toString() {
+ return name;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 391df7d..a637967 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -42,6 +42,8 @@ public class DorisConfigOptions {
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the doris user name.");
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the doris password.");
+ public static final ConfigOption<String> JDBC_URL = ConfigOptions.key("jdbc-url").stringType().noDefaultValue().withDescription("doris jdbc url address.");
+
// source config options
public static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
.key("doris.read.field")
@@ -123,9 +125,34 @@ public class DorisConfigOptions {
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
ConfigOptions.key("lookup.max-retries")
.intType()
- .defaultValue(3)
+ .defaultValue(1)
.withDescription("The max retry times if lookup database failed.");
+ public static final ConfigOption<Integer> LOOKUP_JDBC_READ_BATCH_SIZE =
+ ConfigOptions.key("lookup.jdbc.read.batch.size")
+ .intType()
+ .defaultValue(128)
+ .withDescription("when dimension table query, save the maximum number of batches.");
+
+ public static final ConfigOption<Integer> LOOKUP_JDBC_READ_BATCH_QUEUE_SIZE =
+ ConfigOptions.key("lookup.jdbc.read.batch.queue-size")
+ .intType()
+ .defaultValue(256)
+ .withDescription("dimension table query request buffer queue size.");
+
+ public static final ConfigOption<Integer> LOOKUP_JDBC_READ_THREAD_SIZE =
+ ConfigOptions.key("lookup.jdbc.read.thread-size")
+ .intType()
+ .defaultValue(3)
+ .withDescription("the number of threads for dimension table query, each query occupies a JDBC connection");
+
+ public static final ConfigOption<Boolean> LOOKUP_JDBC_ASYNC =
+ ConfigOptions.key("lookup.jdbc.async")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("whether to set async lookup");
+
+
// sink config options
public static final ConfigOption<Boolean> SINK_ENABLE_2PC = ConfigOptions
.key("sink.enable-2pc")
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index df7b867..81c756c 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -49,8 +49,13 @@ import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_REQUEST_RETR
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
+import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL;
import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_MAX_ROWS;
import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_CACHE_TTL;
+import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_JDBC_ASYNC;
+import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_JDBC_READ_BATCH_QUEUE_SIZE;
+import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_JDBC_READ_BATCH_SIZE;
+import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_JDBC_READ_THREAD_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.LOOKUP_MAX_RETRIES;
import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT;
@@ -95,6 +100,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
options.add(TABLE_IDENTIFIER);
options.add(USERNAME);
options.add(PASSWORD);
+ options.add(JDBC_URL);
options.add(DORIS_READ_FIELD);
options.add(DORIS_FILTER_QUERY);
@@ -110,6 +116,10 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
options.add(LOOKUP_CACHE_MAX_ROWS);
options.add(LOOKUP_CACHE_TTL);
options.add(LOOKUP_MAX_RETRIES);
+ options.add(LOOKUP_JDBC_ASYNC);
+ options.add(LOOKUP_JDBC_READ_BATCH_SIZE);
+ options.add(LOOKUP_JDBC_READ_THREAD_SIZE);
+ options.add(LOOKUP_JDBC_READ_BATCH_QUEUE_SIZE);
options.add(SINK_CHECK_INTERVAL);
options.add(SINK_ENABLE_2PC);
@@ -148,6 +158,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
final String fenodes = readableConfig.get(FENODES);
final DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(fenodes)
+ .setJdbcUrl(readableConfig.get(JDBC_URL))
.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
@@ -204,6 +215,10 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
builder.setCacheExpireMs(readableConfig.get(LOOKUP_CACHE_TTL).toMillis());
builder.setCacheMaxSize(readableConfig.get(LOOKUP_CACHE_MAX_ROWS));
builder.setMaxRetryTimes(readableConfig.get(LOOKUP_MAX_RETRIES));
+ builder.setJdbcReadBatchSize(readableConfig.get(LOOKUP_JDBC_READ_BATCH_SIZE));
+ builder.setJdbcReadBatchQueueSize(readableConfig.get(LOOKUP_JDBC_READ_BATCH_QUEUE_SIZE));
+ builder.setJdbcReadThreadSize(readableConfig.get(LOOKUP_JDBC_READ_THREAD_SIZE));
+ builder.setAsync(readableConfig.get(LOOKUP_JDBC_ASYNC));
return builder.build();
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index c5e47fa..0fb096b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
@@ -126,19 +127,31 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
DataType physicalRowDataType = physicalSchema.toRowDataType();
String[] keyNames = new String[context.getKeys().length];
+ int[] keyIndexs = new int[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
+ keyIndexs[i] = innerKeyArr[0];
+ }
+ if (lookupOptions.isAsync()) {
+ return AsyncTableFunctionProvider.of(
+ new DorisRowDataAsyncLookupFunction(
+ options,
+ lookupOptions,
+ DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+ DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
+ keyNames,
+ keyIndexs));
+ } else {
+ return TableFunctionProvider.of(
+ new DorisRowDataJdbcLookupFunction(
+ options,
+ lookupOptions,
+ DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
+ DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
+ keyNames,
+ keyIndexs));
}
-
- return TableFunctionProvider.of(
- new DorisRowDataLookupFunction(
- options,
- readOptions,
- lookupOptions,
- DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
- DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
- keyNames));
}
@Override
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
new file mode 100644
index 0000000..b3c5f43
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.lookup.DorisJdbcLookupReader;
+import org.apache.doris.flink.lookup.DorisLookupReader;
+import org.apache.doris.flink.lookup.LookupSchema;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class DorisRowDataAsyncLookupFunction extends AsyncTableFunction<RowData> {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataAsyncLookupFunction.class);
+ private final DorisOptions options;
+ private final DorisLookupOptions lookupOptions;
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private transient Cache<RowData, List<RowData>> cache;
+ private DorisLookupReader lookupReader;
+ private LookupSchema lookupSchema;
+
+ public DorisRowDataAsyncLookupFunction(DorisOptions options,
+ DorisLookupOptions lookupOptions,
+ String[] selectFields,
+ DataType[] fieldTypes,
+ String[] conditionFields,
+ int[] keyIndex) {
+ Preconditions.checkNotNull(options.getJdbcUrl(), "jdbc-url is required in jdbc mode lookup");
+ this.options = options;
+ this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+ this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+ this.lookupOptions = lookupOptions;
+ this.lookupSchema = new LookupSchema(options.getTableIdentifier(), selectFields, fieldTypes, conditionFields, keyIndex);
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+ this.cache = cacheMaxSize == -1 || cacheExpireMs == -1
+ ? null
+ : CacheBuilder.newBuilder()
+ .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+ .maximumSize(cacheMaxSize)
+ .build();
+ this.lookupReader = new DorisJdbcLookupReader(options, lookupOptions, lookupSchema);
+ }
+
+ /**
+ * This is a lookup method which is called by Flink framework in runtime.
+ *
+ */
+ public void eval(CompletableFuture<Collection<RowData>> future, Object... keys) throws IOException {
+ RowData keyRow = GenericRowData.of(keys);
+ if (cache != null) {
+ List<RowData> cachedRows = cache.getIfPresent(keyRow);
+ if (cachedRows != null) {
+ future.complete(cachedRows);
+ }
+ return;
+ }
+ CompletableFuture<List<RowData>> resultFuture = lookupReader.asyncGet(keyRow);
+ resultFuture.handleAsync(
+ (resultRows, throwable) -> {
+ try {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ } else {
+ if (resultRows == null || resultRows.isEmpty()) {
+ if(cache != null){
+ cache.put(keyRow, Collections.emptyList());
+ }
+ future.complete(Collections.emptyList());
+ } else {
+ if(cache != null){
+ cache.put(keyRow, resultRows);
+ }
+ future.complete(resultRows);
+ }
+ }
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return null;
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+
+ @VisibleForTesting
+ public Cache<RowData, List<RowData>> getCache() {
+ return cache;
+ }
+}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
new file mode 100644
index 0000000..93a0059
--- /dev/null
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.table;
+
+import org.apache.doris.flink.cfg.DorisLookupOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.lookup.DorisJdbcLookupReader;
+import org.apache.doris.flink.lookup.DorisLookupReader;
+import org.apache.doris.flink.lookup.LookupSchema;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * use jdbc to query
+ */
+public class DorisRowDataJdbcLookupFunction extends TableFunction<RowData> {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataJdbcLookupFunction.class);
+ private final DorisOptions options;
+ private final DorisLookupOptions lookupOptions;
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private transient Cache<RowData, List<RowData>> cache;
+ private DorisLookupReader lookupReader;
+ private LookupSchema lookupSchema;
+
+ public DorisRowDataJdbcLookupFunction(DorisOptions options,
+ DorisLookupOptions lookupOptions,
+ String[] selectFields,
+ DataType[] fieldTypes,
+ String[] conditionFields,
+ int[] keyIndex) {
+ Preconditions.checkNotNull(options.getJdbcUrl(), "jdbc-url is required in jdbc mode lookup");
+ this.options = options;
+ this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+ this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+ this.lookupOptions = lookupOptions;
+ this.lookupSchema = new LookupSchema(options.getTableIdentifier(), selectFields, fieldTypes, conditionFields, keyIndex);
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+ this.cache = cacheMaxSize == -1 || cacheExpireMs == -1
+ ? null
+ : CacheBuilder.newBuilder()
+ .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+ .maximumSize(cacheMaxSize)
+ .build();
+ this.lookupReader = new DorisJdbcLookupReader(options, lookupOptions, lookupSchema);
+ }
+
+ /**
+ * This is a lookup method which is called by Flink framework in runtime.
+ *
+ * @param keys lookup keys
+ */
+ public void eval(Object... keys) throws IOException {
+ RowData keyRow = GenericRowData.of(keys);
+ if (cache != null) {
+ List<RowData> cachedRows = cache.getIfPresent(keyRow);
+ if (cachedRows != null) {
+ for (RowData cachedRow : cachedRows) {
+ collect(cachedRow);
+ }
+ return;
+ }
+ }
+ queryRecord(keyRow);
+ }
+
+ private void queryRecord(RowData keyRow) throws IOException {
+ List<RowData> rowData = lookupReader.get(keyRow);
+ if(rowData == null){
+ rowData = Collections.emptyList();
+ }
+ if(cache != null){
+ cache.put(keyRow, rowData);
+ }
+ rowData.forEach(this::collect);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ }
+
+ @VisibleForTesting
+ public Cache<RowData, List<RowData>> getCache() {
+ return cache;
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinExample.java
new file mode 100644
index 0000000..d3fbf87
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/LookupJoinExample.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.UUID;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+public class LookupJoinExample {
+
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.disableOperatorChaining();
+ env.enableCheckpointing(30000);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ DataStreamSource<Tuple2<Integer,String>> source = env.addSource(new SourceFunction<Tuple2<Integer,String>>() {
+ private Integer id = 1;
+ @Override
+ public void run(SourceContext<Tuple2<Integer,String>> out) throws Exception {
+ while(true){
+ Tuple2<Integer,String> record = new Tuple2<>(id++, UUID.randomUUID().toString());
+ out.collect(record);
+ Thread.sleep(1000);
+ }
+ }
+ @Override
+ public void cancel() {
+ }
+ });
+ tEnv.createTemporaryView("doris_source",source,$("id"),$("uuid"),$("process_time").proctime());
+
+ tEnv.executeSql(
+ "CREATE TABLE lookup_dim_tbl (" +
+ " c_custkey int," +
+ " c_name string," +
+ " c_address string," +
+ " c_city string," +
+ " c_nation string," +
+ " c_region string," +
+ " c_phone string," +
+ " c_mktsegment string" +
+ ") " +
+ "WITH (\n" +
+ " 'connector' = 'doris',\n" +
+ " 'fenodes' = '127.0.0.1:8030',\n" +
+ " 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',\n" +
+ " 'table.identifier' = 'ssb.customer',\n" +
+ " 'lookup.jdbc.async' = 'true',\n" +
+ " 'username' = 'root',\n" +
+ " 'password' = ''\n" +
+ ")");
+
+
+ Table table = tEnv.sqlQuery("select a.id,a.uuid,b.c_name,b.c_nation,b.c_phone from doris_source a " +
+ "left join lookup_dim_tbl FOR SYSTEM_TIME AS OF a.process_time b " +
+ "ON a.id = b.c_custkey");
+
+ tEnv.toRetractStream(table, Row.class).print();
+ env.execute();
+ }
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/RecordTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/RecordTest.java
new file mode 100644
index 0000000..c6dc297
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/RecordTest.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RecordTest {
+
+ private LookupSchema schema;
+
+ @Before
+ public void before(){
+ String tableIdentifier = "db.tbl";
+ String[] selectFields = new String[]{"a","b","c"};
+ DataType[] fieldTypes = new DataType[]{DataTypes.INT(),DataTypes.STRING(),DataTypes.DOUBLE()};
+ schema = new LookupSchema(tableIdentifier,selectFields,fieldTypes,null,null);
+ }
+
+ @Test
+ public void testLookupOneKey(){
+ String[] conditionFields = new String[]{"a"};
+ int[] keyIndex = new int[]{1};
+ Object[] values = new Object[schema.getFieldTypes().length];
+ values[0] = 1001;
+ Record record = appendValues(conditionFields, keyIndex, values);
+
+ Map<RecordKey, Record> map = new HashMap<>();
+ map.put(new RecordKey(record), record);
+ Assert.assertTrue(map.get(new RecordKey(record)) != null);
+ }
+
+ @Test
+ public void testLookupTwoKey(){
+ String[] conditionFields = new String[]{"a","b"};
+ int[] keyIndex = new int[]{1,2};
+ Object[] values = new Object[schema.getFieldTypes().length];
+ values[0] = 1001;
+ values[1] = "doris";
+ Record record = appendValues(conditionFields, keyIndex, values);
+
+ Map<RecordKey, Record> map = new HashMap<>();
+ map.put(new RecordKey(record), record);
+ Assert.assertTrue(map.get(new RecordKey(record)) != null);
+ }
+
+ @Test
+ public void testLookupOnlyTwoKey(){
+ String[] conditionFields = new String[]{"b"};
+ int[] keyIndex = new int[]{2};
+ Object[] values = new Object[schema.getFieldTypes().length];
+ values[0] = "doris";
+ Record record = appendValues(conditionFields, keyIndex, values);
+
+ Map<RecordKey, Record> map = new HashMap<>();
+ map.put(new RecordKey(record), record);
+ Assert.assertTrue(map.get(new RecordKey(record)) != null);
+ }
+
+ private Record appendValues(String[] conditionFields, int[] keyIndex, Object[] values){
+ schema.setKeyIndex(keyIndex);
+ schema.setConditionFields(conditionFields);
+ Record record = new Record(schema);
+ for(int i=0;i<schema.getFieldTypes().length;i++){
+ record.setObject(i,values);
+ }
+ return record;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org