You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/09 14:13:01 UTC
[incubator-doris] branch master updated: [Feature]:Flink-connector
supports streamload parameters (#6243)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d9fc1bf [Feature]:Flink-connector supports streamload parameters (#6243)
d9fc1bf is described below
commit d9fc1bf3ca67b2f0d8925425cf9a4362829f8b62
Author: wudi <67...@qq.com>
AuthorDate: Mon Aug 9 22:12:46 2021 +0800
[Feature]:Flink-connector supports streamload parameters (#6243)
Flink-connector supports streamload parameters
#6199
---
docs/en/extending-doris/flink-doris-connector.md | 1 +
.../zh-CN/extending-doris/flink-doris-connector.md | 1 +
.../apache/doris/flink/backend/BackendClient.java | 5 +-
.../doris/flink/cfg/DorisConnectionOptions.java | 2 +-
.../doris/flink/cfg/DorisExecutionOptions.java | 23 +-
.../org/apache/doris/flink/cfg/DorisOptions.java | 2 +-
.../apache/doris/flink/cfg/DorisReadOptions.java | 8 +-
.../flink/datastream/DorisSourceFunction.java | 16 +-
.../SimpleListDeserializationSchema.java | 3 +-
.../doris/flink/exception/DorisException.java | 8 +-
.../exception/ShouldNeverHappenException.java | 3 +-
.../doris/flink/exception/StreamLoadException.java | 4 +
.../doris/flink/rest/PartitionDefinition.java | 2 +-
.../org/apache/doris/flink/rest/RestService.java | 95 +++--
.../org/apache/doris/flink/rest/SchemaUtils.java | 3 +-
.../org/apache/doris/flink/rest/models/Field.java | 3 +-
.../apache/doris/flink/serialization/RowBatch.java | 8 +-
.../flink/table/DorisDynamicOutputFormat.java | 57 +--
.../flink/table/DorisDynamicTableFactory.java | 427 +++++++++++----------
.../doris/flink/table/DorisDynamicTableSink.java | 2 +-
.../doris/flink/table/DorisDynamicTableSource.java | 90 ++---
.../doris/flink/table/DorisRowDataInputFormat.java | 358 ++++++++---------
.../apache/doris/flink/table/DorisStreamLoad.java | 35 +-
.../doris/flink/table/DorisTableInputSplit.java | 8 +-
24 files changed, 626 insertions(+), 538 deletions(-)
diff --git a/docs/en/extending-doris/flink-doris-connector.md b/docs/en/extending-doris/flink-doris-connector.md
index 39bbe41..d17fb8a 100644
--- a/docs/en/extending-doris/flink-doris-connector.md
+++ b/docs/en/extending-doris/flink-doris-connector.md
@@ -131,6 +131,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
| sink.batch.size | 100 | Maximum number of lines in a single write BE |
| sink.max-retries | 1 | Number of retries after writing BE failed |
| sink.batch.interval | 1s | The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 1 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing. |
+| sink.properties.* | -- | The stream load parameters.eg:sink.properties.column_separator' = ','. |
## Doris & Flink Column Type Mapping
diff --git a/docs/zh-CN/extending-doris/flink-doris-connector.md b/docs/zh-CN/extending-doris/flink-doris-connector.md
index 22363db..ce89964 100644
--- a/docs/zh-CN/extending-doris/flink-doris-connector.md
+++ b/docs/zh-CN/extending-doris/flink-doris-connector.md
@@ -135,6 +135,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
| sink.batch.size | 100 | 单次写BE的最大行数 |
| sink.max-retries | 1 | 写BE失败之后的重试次数 |
| sink.batch.interval | 1s | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。|
+| sink.properties.* | -- | Stream load 的导入参数。例如:sink.properties.column_separator' = ','等 |
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
index 93b353c..40bb5c9 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
@@ -112,6 +112,7 @@ public class BackendClient {
/**
* Open a scanner for reading Doris data.
+ *
* @param openParams thrift struct to required by request
* @return scan open result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
@@ -147,6 +148,7 @@ public class BackendClient {
/**
* get next row batch from Doris BE
+ *
* @param nextBatchParams thrift struct to required by request
* @return scan batch result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
@@ -161,7 +163,7 @@ public class BackendClient {
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to getNext {}.", attempt, routing);
try {
- result = client.get_next(nextBatchParams);
+ result = client.get_next(nextBatchParams);
if (result == null) {
logger.warn("GetNext result from {} is null.", routing);
continue;
@@ -189,6 +191,7 @@ public class BackendClient {
/**
* close an scanner.
+ *
* @param closeParams thrift struct to required by request
*/
public void closeScanner(TScanCloseParams closeParams) {
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
index 619ce74..9b2187c 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
@@ -21,7 +21,7 @@ import org.apache.flink.util.Preconditions;
import java.io.Serializable;
/**
- * Doris connection options.
+ * Doris connection options.
*/
public class DorisConnectionOptions implements Serializable {
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 330cbc9..3d035ab 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -21,22 +21,29 @@ import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.time.Duration;
+import java.util.Properties;
/**
* JDBC sink batch options.
*/
-public class DorisExecutionOptions implements Serializable {
+public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;
private final Integer batchSize;
private final Integer maxRetries;
private final Long batchIntervalMs;
- public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs) {
+ /**
+ * Properties for the StreamLoad.
+ */
+ private final Properties streamLoadProp;
+
+ public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Properties streamLoadProp) {
Preconditions.checkArgument(maxRetries >= 0);
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.batchIntervalMs = batchIntervalMs;
+ this.streamLoadProp = streamLoadProp;
}
public Integer getBatchSize() {
@@ -51,6 +58,10 @@ public class DorisExecutionOptions implements Serializable {
return batchIntervalMs;
}
+ public Properties getStreamLoadProp() {
+ return streamLoadProp;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -62,6 +73,7 @@ public class DorisExecutionOptions implements Serializable {
private Integer batchSize;
private Integer maxRetries;
private Long batchIntervalMs;
+ private Properties streamLoadProp;
public Builder setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
@@ -78,8 +90,13 @@ public class DorisExecutionOptions implements Serializable {
return this;
}
+ public Builder setStreamLoadProp(Properties streamLoadProp) {
+ this.streamLoadProp = streamLoadProp;
+ return this;
+ }
+
public DorisExecutionOptions build() {
- return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs);
+ return new DorisExecutionOptions(batchSize, maxRetries, batchIntervalMs, streamLoadProp);
}
}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index c542d6b..512d0ab 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -25,7 +25,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Options for the Doris connector.
*/
-public class DorisOptions extends DorisConnectionOptions{
+public class DorisOptions extends DorisConnectionOptions {
private static final long serialVersionUID = 1L;
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 833ecf5..53cefaa 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
/**
* Doris read Options
*/
-public class DorisReadOptions implements Serializable {
+public class DorisReadOptions implements Serializable {
private static final long serialVersionUID = 1L;
@@ -35,7 +35,7 @@ public class DorisReadOptions implements Serializable {
private Integer requestRetries;
private Integer requestBatchSize;
private Long execMemLimit;
- private Integer deserializeQueueSize;
+ private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
public DorisReadOptions(String readFields, String filterQuery, Integer requestTabletSize, Integer requestConnectTimeoutMs, Integer requestReadTimeoutMs,
@@ -117,7 +117,7 @@ public class DorisReadOptions implements Serializable {
private Integer requestRetries;
private Integer requestBatchSize;
private Long execMemLimit;
- private Integer deserializeQueueSize;
+ private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
@@ -177,7 +177,7 @@ public class DorisReadOptions implements Serializable {
}
public DorisReadOptions build() {
- return new DorisReadOptions(readFields,filterQuery,requestTabletSize,requestConnectTimeoutMs,requestReadTimeoutMs,requestQueryTimeoutS,requestRetries,requestBatchSize,execMemLimit,deserializeQueueSize,deserializeArrowAsync);
+ return new DorisReadOptions(readFields, filterQuery, requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, deserializeQueueSize, deserializeArrowAsync);
}
}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
index 5c580db..82ab224 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
@@ -38,15 +38,15 @@ import java.util.List;
public class DorisSourceFunction<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
- private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
+ private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
private DorisDeserializationSchema deserializer;
private DorisOptions options;
private DorisReadOptions readOptions;
- private List<PartitionDefinition> dorisPartitions;
+ private List<PartitionDefinition> dorisPartitions;
private ScalaValueReader scalaValueReader;
- public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
+ public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
this.deserializer = deserializer;
this.options = streamOptions.getOptions();
this.readOptions = streamOptions.getReadOptions();
@@ -55,14 +55,14 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- this.dorisPartitions = RestService.findPartitions(options,readOptions,logger);
+ this.dorisPartitions = RestService.findPartitions(options, readOptions, logger);
}
@Override
- public void run(SourceContext sourceContext) throws Exception{
- for(PartitionDefinition partitions : dorisPartitions){
- scalaValueReader = new ScalaValueReader(partitions, options,readOptions);
- while (scalaValueReader.hasNext()){
+ public void run(SourceContext sourceContext) throws Exception {
+ for (PartitionDefinition partitions : dorisPartitions) {
+ scalaValueReader = new ScalaValueReader(partitions, options, readOptions);
+ while (scalaValueReader.hasNext()) {
Object next = scalaValueReader.next();
sourceContext.collect(next);
}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
index 6fd68ec..7fcf2f6 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
@@ -18,10 +18,11 @@ package org.apache.doris.flink.deserialization;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+
import java.util.List;
-public class SimpleListDeserializationSchema implements DorisDeserializationSchema{
+public class SimpleListDeserializationSchema implements DorisDeserializationSchema {
@Override
public TypeInformation getProducedType() {
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisException.java
index 26e11e5..2274f87 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisException.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisException.java
@@ -21,18 +21,22 @@ public class DorisException extends Exception {
public DorisException() {
super();
}
+
public DorisException(String message) {
super(message);
}
+
public DorisException(String message, Throwable cause) {
super(message, cause);
}
+
public DorisException(Throwable cause) {
super(cause);
}
+
protected DorisException(String message, Throwable cause,
- boolean enableSuppression,
- boolean writableStackTrace) {
+ boolean enableSuppression,
+ boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
index 307c398..a26718d 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
@@ -17,4 +17,5 @@
package org.apache.doris.flink.exception;
-public class ShouldNeverHappenException extends DorisException { }
+public class ShouldNeverHappenException extends DorisException {
+}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java
index 6d7a192..233d27e 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java
@@ -21,15 +21,19 @@ public class StreamLoadException extends Exception {
public StreamLoadException() {
super();
}
+
public StreamLoadException(String message) {
super(message);
}
+
public StreamLoadException(String message, Throwable cause) {
super(message, cause);
}
+
public StreamLoadException(Throwable cause) {
super(cause);
}
+
protected StreamLoadException(String message, Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
index 19edd21..8a66f76 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
@@ -103,7 +103,7 @@ public class PartitionDefinition implements Serializable, Comparable<PartitionDe
similar.retainAll(o.tabletIds);
diffSelf.removeAll(similar);
diffOther.removeAll(similar);
- if (diffSelf.size() == 0) {
+ if (diffSelf.size() == 0) {
return 0;
}
long diff = Collections.min(diffSelf) - Collections.min(diffOther);
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index cd5b6d5..184afd3 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -88,13 +88,14 @@ public class RestService implements Serializable {
/**
* send request to Doris FE and get response json string.
+ *
* @param options configuration of request
* @param request {@link HttpRequestBase} real request
- * @param logger {@link Logger}
+ * @param logger {@link Logger}
* @return Doris FE response in json string
* @throws ConnectedFailedException throw when cannot connect to Doris FE
*/
- private static String send(DorisOptions options,DorisReadOptions readOptions, HttpRequestBase request, Logger logger) throws
+ private static String send(DorisOptions options, DorisReadOptions readOptions, HttpRequestBase request, Logger logger) throws
ConnectedFailedException {
int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs();
int socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs();
@@ -116,10 +117,10 @@ public class RestService implements Serializable {
logger.debug("Attempt {} to request {}.", attempt, request.getURI());
try {
String response;
- if (request instanceof HttpGet){
- response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(),logger);
+ if (request instanceof HttpGet) {
+ response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(), logger);
} else {
- response = getConnectionPost(request, options.getUsername(), options.getPassword(),logger);
+ response = getConnectionPost(request, options.getUsername(), options.getPassword(), logger);
}
if (response == null) {
logger.warn("Failed to get response from Doris FE {}, http code is {}",
@@ -147,14 +148,14 @@ public class RestService implements Serializable {
throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
}
- private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
+ private static String getConnectionPost(HttpRequestBase request, String user, String passwd, Logger logger) throws IOException {
URL url = new URL(request.getURI().toString());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod(request.getMethod());
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
- InputStream content = ((HttpPost)request).getEntity().getContent();
+ InputStream content = ((HttpPost) request).getEntity().getContent();
String res = IOUtils.toString(content);
conn.setDoOutput(true);
conn.setDoInput(true);
@@ -164,21 +165,21 @@ public class RestService implements Serializable {
// flush
out.flush();
// read response
- return parseResponse(conn,logger);
+ return parseResponse(conn, logger);
}
- private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException {
+ private static String getConnectionGet(String request, String user, String passwd, Logger logger) throws IOException {
URL realUrl = new URL(request);
// open connection
- HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection();
+ HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
connection.setRequestProperty("Authorization", "Basic " + authEncoding);
connection.connect();
- return parseResponse(connection,logger);
+ return parseResponse(connection, logger);
}
- private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException {
+ private static String parseResponse(HttpURLConnection connection, Logger logger) throws IOException {
if (connection.getResponseCode() != HttpStatus.SC_OK) {
logger.warn("Failed to get response from Doris {}, http code is {}",
connection.getURL(), connection.getResponseCode());
@@ -198,8 +199,9 @@ public class RestService implements Serializable {
/**
* parse table identifier to array.
+ *
* @param tableIdentifier table identifier string
- * @param logger {@link Logger}
+ * @param logger {@link Logger}
* @return first element is db name, second element is table name
* @throws IllegalArgumentException table identifier is illegal
*/
@@ -220,8 +222,9 @@ public class RestService implements Serializable {
/**
* choice a Doris FE node to request.
+ *
* @param feNodes Doris FE node list, separate be comma
- * @param logger slf4j logger
+ * @param logger slf4j logger
* @return the chosen one Doris FE node
* @throws IllegalArgumentException fe nodes is illegal
*/
@@ -239,14 +242,15 @@ public class RestService implements Serializable {
/**
* choice a Doris BE node to request.
+ *
* @param options configuration of request
- * @param logger slf4j logger
+ * @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
- public static String randomBackend(DorisOptions options,DorisReadOptions readOptions ,Logger logger) throws DorisException, IOException {
- List<BackendRow> backends = getBackends(options,readOptions, logger);
+ public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
+ List<BackendRow> backends = getBackends(options, readOptions, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
@@ -259,19 +263,20 @@ public class RestService implements Serializable {
/**
* get Doris BE nodes to request.
+ *
* @param options configuration of request
- * @param logger slf4j logger
+ * @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
- static List<BackendRow> getBackends(DorisOptions options,DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
+ static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
String feNodes = options.getFenodes();
String feNode = randomEndpoint(feNodes, logger);
String beUrl = "http://" + feNode + BACKENDS;
HttpGet httpGet = new HttpGet(beUrl);
- String response = send(options, readOptions,httpGet, logger);
- logger.info("Backend Info:{}",response);
+ String response = send(options, readOptions, httpGet, logger);
+ logger.info("Backend Info:{}", response);
List<BackendRow> backends = parseBackend(response, logger);
return backends;
}
@@ -306,8 +311,9 @@ public class RestService implements Serializable {
/**
* get a valid URI to connect Doris FE.
+ *
* @param options configuration of request
- * @param logger {@link Logger}
+ * @param logger {@link Logger}
* @return uri string
* @throws IllegalArgumentException throw when configuration is illegal
*/
@@ -323,24 +329,26 @@ public class RestService implements Serializable {
/**
* discover Doris table schema from Doris FE.
+ *
* @param options configuration of request
- * @param logger slf4j logger
+ * @param logger slf4j logger
* @return Doris table schema
* @throws DorisException throw when discover failed
*/
- public static Schema getSchema(DorisOptions options,DorisReadOptions readOptions, Logger logger)
+ public static Schema getSchema(DorisOptions options, DorisReadOptions readOptions, Logger logger)
throws DorisException {
logger.trace("Finding schema.");
HttpGet httpGet = new HttpGet(getUriStr(options, logger) + SCHEMA);
- String response = send(options, readOptions,httpGet, logger);
+ String response = send(options, readOptions, httpGet, logger);
logger.debug("Find schema response is '{}'.", response);
return parseSchema(response, logger);
}
/**
* translate Doris FE response to inner {@link Schema} struct.
+ *
* @param response Doris FE response
- * @param logger {@link Logger}
+ * @param logger {@link Logger}
* @return inner {@link Schema} struct
* @throws DorisException throw when translate failed
*/
@@ -381,14 +389,15 @@ public class RestService implements Serializable {
/**
* find Doris RDD partitions from Doris FE.
+ *
* @param options configuration of request
- * @param logger {@link Logger}
+ * @param logger {@link Logger}
* @return an list of Doris RDD partitions
* @throws DorisException throw when find partition failed
*/
public static List<PartitionDefinition> findPartitions(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException {
String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);
- String readFields = StringUtils.isBlank(readOptions.getReadFields()) ? "*" :readOptions.getReadFields();
+ String readFields = StringUtils.isBlank(readOptions.getReadFields()) ? "*" : readOptions.getReadFields();
String sql = "select " + readFields +
" from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`";
if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
@@ -397,14 +406,14 @@ public class RestService implements Serializable {
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
HttpPost httpPost = new HttpPost(getUriStr(options, logger) + QUERY_PLAN);
- String entity = "{\"sql\": \""+ sql +"\"}";
+ String entity = "{\"sql\": \"" + sql + "\"}";
logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
stringEntity.setContentEncoding("UTF-8");
stringEntity.setContentType("application/json");
httpPost.setEntity(stringEntity);
- String resStr = send(options, readOptions,httpPost, logger);
+ String resStr = send(options, readOptions, httpPost, logger);
logger.debug("Find partition response is '{}'.", resStr);
QueryPlan queryPlan = getQueryPlan(resStr, logger);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);
@@ -420,8 +429,9 @@ public class RestService implements Serializable {
/**
* translate Doris FE response string to inner {@link QueryPlan} struct.
+ *
* @param response Doris FE response string
- * @param logger {@link Logger}
+ * @param logger {@link Logger}
* @return inner {@link QueryPlan} struct
* @throws DorisException throw when translate failed.
*/
@@ -461,13 +471,14 @@ public class RestService implements Serializable {
/**
* select which Doris BE to get tablet data.
+ *
* @param queryPlan {@link QueryPlan} translated from Doris FE response
- * @param logger {@link Logger}
+ * @param logger {@link Logger}
* @return BE to tablets {@link Map}
* @throws DorisException throw when select failed.
*/
@VisibleForTesting
- static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException {
+ static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException {
Map<String, List<Long>> be2Tablets = new HashMap<>();
for (Map.Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) {
logger.debug("Parse tablet info: '{}'.", part);
@@ -512,8 +523,9 @@ public class RestService implements Serializable {
/**
* tablet count limit for one Doris RDD partition
+ *
* @param readOptions configuration of request
- * @param logger {@link Logger}
+ * @param logger {@link Logger}
* @return tablet count limit
*/
@VisibleForTesting
@@ -533,18 +545,19 @@ public class RestService implements Serializable {
/**
* translate BE tablets map to Doris RDD partition.
- * @param options configuration of request
- * @param be2Tablets BE to tablets {@link Map}
+ *
+ * @param options configuration of request
+ * @param be2Tablets BE to tablets {@link Map}
* @param opaquedQueryPlan Doris BE execute plan getting from Doris FE
- * @param database database name of Doris table
- * @param table table name of Doris table
- * @param logger {@link Logger}
+ * @param database database name of Doris table
+ * @param table table name of Doris table
+ * @param logger {@link Logger}
* @return Doris RDD partition {@link List}
* @throws IllegalArgumentException throw when translate failed
*/
@VisibleForTesting
- static List<PartitionDefinition> tabletsMapToPartition(DorisOptions options,DorisReadOptions readOptions, Map<String, List<Long>> be2Tablets,
- String opaquedQueryPlan, String database, String table, Logger logger)
+ static List<PartitionDefinition> tabletsMapToPartition(DorisOptions options, DorisReadOptions readOptions, Map<String, List<Long>> be2Tablets,
+ String opaquedQueryPlan, String database, String table, Logger logger)
throws IllegalArgumentException {
int tabletsSize = tabletCountLimitForOnePartition(readOptions, logger);
List<PartitionDefinition> partitions = new ArrayList<>();
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
index 46c54c0..13bde01 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
@@ -27,10 +27,11 @@ public class SchemaUtils {
/**
* convert Doris return schema to inner schema struct.
+ *
* @param tscanColumnDescs Doris BE return schema
* @return inner schema struct
*/
- public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs ){
+ public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs) {
Schema schema = new Schema(tscanColumnDescs.size());
tscanColumnDescs.stream().forEach(desc -> schema.put(new Field(desc.getName(), desc.getType().name(), "", 0, 0)));
return schema;
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
index 4ac66be..9a58180 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
@@ -26,7 +26,8 @@ public class Field {
private int precision;
private int scale;
- public Field() { }
+ public Field() {
+ }
public Field(String name, String type, String comment, int precision, int scale) {
this.name = name;
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 8d81a37..00c699b 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -63,7 +63,7 @@ public class RowBatch {
this.cols = new ArrayList<>(colCount);
}
- public List<Object> getCols() {
+ public List<Object> getCols() {
return cols;
}
@@ -87,13 +87,13 @@ public class RowBatch {
return rowBatch;
}
- public RowBatch(TScanBatchResult nextResult, Schema schema){
+ public RowBatch(TScanBatchResult nextResult, Schema schema) {
this.schema = schema;
this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
this.arrowStreamReader = new ArrowStreamReader(
new ByteArrayInputStream(nextResult.getRows()),
rootAllocator
- );
+ );
this.offsetInRowBatch = 0;
}
@@ -243,7 +243,7 @@ public class RowBatch {
continue;
}
BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
- addValueToRow(rowIndex, DecimalData.fromBigDecimal(value,value.precision(),value.scale()));
+ addValueToRow(rowIndex, DecimalData.fromBigDecimal(value, value.precision(), value.scale()));
}
break;
case "DATE":
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 33f5c85..77b53ba 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -44,17 +45,23 @@ import java.util.concurrent.TimeUnit;
/**
* DorisDynamicOutputFormat
**/
-public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
+public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
-
- private DorisOptions options ;
- private DorisReadOptions readOptions;
- private DorisExecutionOptions executionOptions;
+ private static final String FIELD_DELIMITER_KEY = "column_separator";
+ private static final String FIELD_DELIMITER_DEFAULT = "\t";
+ private static final String LINE_DELIMITER_KEY = "line_delimiter";
+ private static final String LINE_DELIMITER_DEFAULT = "\n";
+ private static final String NULL_VALUE = "\\N";
+ private final String fieldDelimiter;
+ private final String lineDelimiter;
+
+ private DorisOptions options;
+ private DorisReadOptions readOptions;
+ private DorisExecutionOptions executionOptions;
private DorisStreamLoad dorisStreamLoad;
- private final String fieldDelimiter = "\t";
- private final String lineDelimiter = "\n";
- private final String NULL_VALUE = "\\N";
+
+
private final List<String> batch = new ArrayList<>();
private transient volatile boolean closed = false;
@@ -62,15 +69,16 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile Exception flushException;
- public DorisDynamicOutputFormat(DorisOptions option,DorisReadOptions readOptions,DorisExecutionOptions executionOptions) {
+ public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions) {
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
+ this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
+ this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
}
@Override
public void configure(Configuration configuration) {
-
}
@Override
@@ -80,8 +88,9 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
options.getTableIdentifier().split("\\.")[0],
options.getTableIdentifier().split("\\.")[1],
options.getUsername(),
- options.getPassword());
- LOG.info("Streamload BE:{}",dorisStreamLoad.getLoadUrlStr());
+ options.getPassword(),
+ executionOptions.getStreamLoadProp());
+ LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output-format"));
@@ -118,12 +127,12 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
private void addBatch(RowData row) {
StringJoiner value = new StringJoiner(this.fieldDelimiter);
GenericRowData rowData = (GenericRowData) row;
- for(int i = 0; i < row.getArity(); ++i) {
+ for (int i = 0; i < row.getArity(); ++i) {
Object field = rowData.getField(i);
- if(field != null){
+ if (field != null) {
value.add(field.toString());
- }else{
- value.add(this.NULL_VALUE);
+ } else {
+ value.add(NULL_VALUE);
}
}
batch.add(value.toString());
@@ -151,12 +160,12 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
public synchronized void flush() throws IOException {
checkFlushException();
- if(batch.isEmpty()){
+ if (batch.isEmpty()) {
return;
}
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
- dorisStreamLoad.load(String.join(lineDelimiter,batch));
+ dorisStreamLoad.load(String.join(this.lineDelimiter, batch));
batch.clear();
break;
} catch (StreamLoadException e) {
@@ -166,7 +175,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
}
try {
dorisStreamLoad.setHostPort(getBackend());
- LOG.warn("streamload error,switch be: {}",dorisStreamLoad.getLoadUrlStr(), e);
+ LOG.warn("streamload error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e);
Thread.sleep(1000 * i);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
@@ -177,10 +186,10 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
}
- private String getBackend() throws IOException{
+ private String getBackend() throws IOException {
try {
//get be url from fe
- return RestService.randomBackend(options,readOptions, LOG);
+ return RestService.randomBackend(options, readOptions, LOG);
} catch (IOException | DorisException e) {
LOG.error("get backends info fail");
throw new IOException(e);
@@ -202,8 +211,8 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
*/
public static class Builder {
private DorisOptions.Builder optionsBuilder;
- private DorisReadOptions readOptions;
- private DorisExecutionOptions executionOptions;
+ private DorisReadOptions readOptions;
+ private DorisExecutionOptions executionOptions;
public Builder() {
this.optionsBuilder = DorisOptions.builder();
@@ -241,7 +250,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
public DorisDynamicOutputFormat build() {
return new DorisDynamicOutputFormat(
- optionsBuilder.build(),readOptions,executionOptions
+ optionsBuilder.build(), readOptions, executionOptions
);
}
}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 27b6f97..92d69e6 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -34,6 +34,8 @@ import org.apache.flink.table.utils.TableSchemaUtils;
import java.time.Duration;
import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
@@ -52,209 +54,224 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_
* <p>Because the table source requires a decoding format, we are discovering the format using the
* provided {@link FactoryUtil} for convenience.
*/
-public final class DorisDynamicTableFactory implements DynamicTableSourceFactory , DynamicTableSinkFactory {
-
- public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address.");
- public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name.");
- public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name.");
- public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password.");
-
- // doris options
- private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
- .key("doris.read.field")
- .stringType()
- .noDefaultValue()
- .withDescription("List of column names in the Doris table, separated by commas");
-
- private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
- .key("doris.filter.query")
- .stringType()
- .noDefaultValue()
- .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
-
- private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
- .key("doris.request.tablet.size")
- .intType()
- .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
- .withDescription("");
-
- private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
- .key("doris.request.connect.timeout.ms")
- .intType()
- .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
- .withDescription("");
-
- private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions
- .key("doris.request.read.timeout.ms")
- .intType()
- .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
- .withDescription("");
-
- private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions
- .key("doris.request.query.timeout.s")
- .intType()
- .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
- .withDescription("");
-
- private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions
- .key("doris.request.retries")
- .intType()
- .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
- .withDescription("");
-
- private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions
- .key("doris.deserialize.arrow.async")
- .booleanType()
- .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
- .withDescription("");
-
- private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions
- .key("doris.request.retriesdoris.deserialize.queue.size")
- .intType()
- .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
- .withDescription("");
-
-
- private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
- .key("doris.batch.size")
- .intType()
- .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
- .withDescription("");
-
- private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
- .key("doris.exec.mem.limit")
- .longType()
- .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
- .withDescription("");
-
- // flink write config options
- private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
- .key("sink.batch.size")
- .intType()
- .defaultValue(100)
- .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
- " of records, will flush data. The default value is 100.");
-
- private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
- .key("sink.max-retries")
- .intType()
- .defaultValue(3)
- .withDescription("the max retry times if writing records to database failed.");
-
- private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
- .key("sink.batch.interval")
- .durationType()
- .defaultValue(Duration.ofSeconds(1))
- .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
- "default value is 1s.");
-
-
- @Override
- public String factoryIdentifier() {
- return "doris"; // used for matching to `connector = '...'`
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(FENODES);
- options.add(TABLE_IDENTIFIER);
- return options;
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(FENODES);
- options.add(TABLE_IDENTIFIER);
- options.add(USERNAME);
- options.add(PASSWORD);
-
- options.add(DORIS_READ_FIELD);
- options.add(DORIS_FILTER_QUERY);
- options.add(DORIS_TABLET_SIZE);
- options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
- options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
- options.add(DORIS_REQUEST_QUERY_TIMEOUT_S);
- options.add(DORIS_REQUEST_RETRIES);
- options.add(DORIS_DESERIALIZE_ARROW_ASYNC);
- options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
- options.add(DORIS_BATCH_SIZE);
- options.add(DORIS_EXEC_MEM_LIMIT);
-
- options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
- options.add(SINK_MAX_RETRIES);
- options.add(SINK_BUFFER_FLUSH_INTERVAL);
- return options;
- }
-
- @Override
- public DynamicTableSource createDynamicTableSource(Context context) {
- // either implement your custom validation logic here ...
- // or use the provided helper utility
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- // validate all options
- helper.validate();
- // get the validated options
- final ReadableConfig options = helper.getOptions();
- // derive the produced data type (excluding computed columns) from the catalog table
- final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
- TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
- // create and return dynamic table source
- return new DorisDynamicTableSource(
- getDorisOptions(helper.getOptions()),
- getDorisReadOptions(helper.getOptions()),
- physicalSchema);
- }
-
- private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
- final String fenodes = readableConfig.get(FENODES);
- final DorisOptions.Builder builder = DorisOptions.builder()
- .setFenodes(fenodes)
- .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
-
- readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
- readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
- return builder.build();
- }
-
- private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
- final DorisReadOptions.Builder builder = DorisReadOptions.builder();
- builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
- .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
- .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
- .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
- .setReadFields(readableConfig.get(DORIS_READ_FIELD))
- .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
- .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
- .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
- .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
- .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
- .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
- return builder.build();
- }
-
- private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig) {
- final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
- builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
- builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
- builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
- return builder.build();
- }
-
-
- @Override
- public DynamicTableSink createDynamicTableSink(Context context) {
- // either implement your custom validation logic here ...
- // or use the provided helper utility
- final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
- // validate all options
- helper.validate();
- // create and return dynamic table source
- return new DorisDynamicTableSink(
- getDorisOptions(helper.getOptions()),
- getDorisReadOptions(helper.getOptions()),
- getDorisExecutionOptions(helper.getOptions())
- );
- }
+public final class DorisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address.");
+ public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name.");
+ public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name.");
+ public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password.");
+
+ // doris options
+ private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
+ .key("doris.read.field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("List of column names in the Doris table, separated by commas");
+
+ private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
+ .key("doris.filter.query")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
+
+ private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
+ .key("doris.request.tablet.size")
+ .intType()
+ .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
+ .withDescription("");
+
+ private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
+ .key("doris.request.connect.timeout.ms")
+ .intType()
+ .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
+ .withDescription("");
+
+ private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions
+ .key("doris.request.read.timeout.ms")
+ .intType()
+ .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
+ .withDescription("");
+
+ private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions
+ .key("doris.request.query.timeout.s")
+ .intType()
+ .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
+ .withDescription("");
+
+ private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions
+ .key("doris.request.retries")
+ .intType()
+ .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
+ .withDescription("");
+
+ private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions
+ .key("doris.deserialize.arrow.async")
+ .booleanType()
+ .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
+ .withDescription("");
+
+ private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions
+ .key("doris.request.retriesdoris.deserialize.queue.size")
+ .intType()
+ .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
+ .withDescription("");
+
+
+ private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
+ .key("doris.batch.size")
+ .intType()
+ .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+ .withDescription("");
+
+ private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
+ .key("doris.exec.mem.limit")
+ .longType()
+ .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
+ .withDescription("");
+
+ // flink write config options
+ private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
+ .key("sink.batch.size")
+ .intType()
+ .defaultValue(100)
+ .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
+ " of records, will flush data. The default value is 100.");
+
+ private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
+ .key("sink.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription("the max retry times if writing records to database failed.");
+
+ private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
+ .key("sink.batch.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
+ "default value is 1s.");
+
+
+ // Prefix for Doris StreamLoad specific properties.
+ public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
+
+ @Override
+ public String factoryIdentifier() {
+ return "doris"; // used for matching to `connector = '...'`
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FENODES);
+ options.add(TABLE_IDENTIFIER);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FENODES);
+ options.add(TABLE_IDENTIFIER);
+ options.add(USERNAME);
+ options.add(PASSWORD);
+
+ options.add(DORIS_READ_FIELD);
+ options.add(DORIS_FILTER_QUERY);
+ options.add(DORIS_TABLET_SIZE);
+ options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
+ options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
+ options.add(DORIS_REQUEST_QUERY_TIMEOUT_S);
+ options.add(DORIS_REQUEST_RETRIES);
+ options.add(DORIS_DESERIALIZE_ARROW_ASYNC);
+ options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
+ options.add(DORIS_BATCH_SIZE);
+ options.add(DORIS_EXEC_MEM_LIMIT);
+
+ options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+ options.add(SINK_MAX_RETRIES);
+ options.add(SINK_BUFFER_FLUSH_INTERVAL);
+ return options;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ // either implement your custom validation logic here ...
+ // or use the provided helper utility
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ // validate all options
+ helper.validate();
+ // get the validated options
+ final ReadableConfig options = helper.getOptions();
+ // derive the produced data type (excluding computed columns) from the catalog table
+ final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
+ TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+ // create and return dynamic table source
+ return new DorisDynamicTableSource(
+ getDorisOptions(helper.getOptions()),
+ getDorisReadOptions(helper.getOptions()),
+ physicalSchema);
+ }
+
+ private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
+ final String fenodes = readableConfig.get(FENODES);
+ final DorisOptions.Builder builder = DorisOptions.builder()
+ .setFenodes(fenodes)
+ .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
+
+ readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
+ readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
+ return builder.build();
+ }
+
+ private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
+ final DorisReadOptions.Builder builder = DorisReadOptions.builder();
+ builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
+ .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
+ .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
+ .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
+ .setReadFields(readableConfig.get(DORIS_READ_FIELD))
+ .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
+ .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
+ .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
+ .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
+ .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
+ .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
+ return builder.build();
+ }
+
+ private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig, Properties streamLoadProp) {
+ final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
+ builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
+ builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
+ builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+ builder.setStreamLoadProp(streamLoadProp);
+ return builder.build();
+ }
+
+ private Properties getStreamLoadProp(Map<String, String> tableOptions) {
+ final Properties streamLoadProp = new Properties();
+
+ for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
+ if (entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)) {
+ String subKey = entry.getKey().substring(STREAM_LOAD_PROP_PREFIX.length());
+ streamLoadProp.put(subKey, entry.getValue());
+ }
+ }
+ return streamLoadProp;
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ // validate all options
+ helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
+
+ Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions());
+ // create and return dynamic table source
+ return new DorisDynamicTableSink(
+ getDorisOptions(helper.getOptions()),
+ getDorisReadOptions(helper.getOptions()),
+ getDorisExecutionOptions(helper.getOptions(), streamLoadProp)
+ );
+ }
}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index feeab96..dc710d7 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -63,7 +63,7 @@ public class DorisDynamicTableSink implements DynamicTableSink {
@Override
public DynamicTableSink copy() {
- return new DorisDynamicTableSink(options,readOptions,executionOptions);
+ return new DorisDynamicTableSink(options, readOptions, executionOptions);
}
@Override
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index ab523aa..43d9e5f 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -46,56 +46,56 @@ import java.util.List;
* where we instantiate the required {@link SourceFunction} and its {@link DeserializationSchema} for
* runtime. Both instances are parameterized to return internal data structures (i.e. {@link RowData}).
*/
-public final class DorisDynamicTableSource implements ScanTableSource ,LookupTableSource {
+public final class DorisDynamicTableSource implements ScanTableSource, LookupTableSource {
- private final DorisOptions options;
- private final DorisReadOptions readOptions;
- private TableSchema physicalSchema;
- private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
+ private final DorisOptions options;
+ private final DorisReadOptions readOptions;
+ private TableSchema physicalSchema;
+ private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
- public DorisDynamicTableSource(DorisOptions options, DorisReadOptions readOptions,TableSchema physicalSchema) {
- this.options = options;
- this.readOptions = readOptions;
- this.physicalSchema = physicalSchema;
- }
+ public DorisDynamicTableSource(DorisOptions options, DorisReadOptions readOptions, TableSchema physicalSchema) {
+ this.options = options;
+ this.readOptions = readOptions;
+ this.physicalSchema = physicalSchema;
+ }
- @Override
- public ChangelogMode getChangelogMode() {
- // in our example the format decides about the changelog mode
- // but it could also be the source itself
- return ChangelogMode.insertOnly();
- }
+ @Override
+ public ChangelogMode getChangelogMode() {
+ // in our example the format decides about the changelog mode
+ // but it could also be the source itself
+ return ChangelogMode.insertOnly();
+ }
- @Override
- public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
- List<PartitionDefinition> dorisPartitions ;
- try {
- dorisPartitions = RestService.findPartitions(options,readOptions,LOG);
- } catch (DorisException e) {
- throw new RuntimeException("can not fetch partitions");
- }
- DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder()
- .setFenodes(options.getFenodes())
- .setUsername(options.getUsername())
- .setPassword(options.getPassword())
- .setTableIdentifier(options.getTableIdentifier())
- .setPartitions(dorisPartitions)
- .setReadOptions(readOptions);
- return InputFormatProvider.of(builder.build());
- }
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+ List<PartitionDefinition> dorisPartitions;
+ try {
+ dorisPartitions = RestService.findPartitions(options, readOptions, LOG);
+ } catch (DorisException e) {
+ throw new RuntimeException("can not fetch partitions");
+ }
+ DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder()
+ .setFenodes(options.getFenodes())
+ .setUsername(options.getUsername())
+ .setPassword(options.getPassword())
+ .setTableIdentifier(options.getTableIdentifier())
+ .setPartitions(dorisPartitions)
+ .setReadOptions(readOptions);
+ return InputFormatProvider.of(builder.build());
+ }
- @Override
- public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
- return null;
- }
+ @Override
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
+ return null;
+ }
- @Override
- public DynamicTableSource copy() {
- return new DorisDynamicTableSource(options,readOptions,physicalSchema);
- }
+ @Override
+ public DynamicTableSource copy() {
+ return new DorisDynamicTableSource(options, readOptions, physicalSchema);
+ }
- @Override
- public String asSummaryString() {
- return "Doris Table Source";
- }
+ @Override
+ public String asSummaryString() {
+ return "Doris Table Source";
+ }
}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index 75e7fc9..c75a88f 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -45,183 +45,183 @@ import java.util.List;
@Internal
public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTableInputSplit> implements ResultTypeQueryable<RowData> {
- private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
-
- private DorisOptions options;
- private DorisReadOptions readOptions;
- private List<PartitionDefinition> dorisPartitions;
- private TypeInformation<RowData> rowDataTypeInfo;
-
- private ScalaValueReader scalaValueReader;
- private transient boolean hasNext;
-
- public DorisRowDataInputFormat(DorisOptions options,List<PartitionDefinition> dorisPartitions,DorisReadOptions readOptions) {
- this.options = options;
- this.dorisPartitions = dorisPartitions;
- this.readOptions = readOptions;
- }
-
- @Override
- public void configure(Configuration parameters) {
- //do nothing here
- }
-
- @Override
- public void openInputFormat() {
- //called once per inputFormat (on open)
- }
-
- @Override
- public void closeInputFormat() {
- //called once per inputFormat (on close)
- }
-
- /**
- * Connects to the source database and executes the query in a <b>parallel
- * fashion</b> if
- * this {@link InputFormat} is built using a parameterized query (i.e. using
- * a {@link PreparedStatement})
- * and a proper {@link }, in a <b>non-parallel
- * fashion</b> otherwise.
- *
- * @param inputSplit which is ignored if this InputFormat is executed as a
- * non-parallel source,
- * a "hook" to the query parameters otherwise (using its
- * <i>splitNumber</i>)
- * @throws IOException if there's an error during the execution of the query
- */
- @Override
- public void open(DorisTableInputSplit inputSplit) throws IOException {
- scalaValueReader = new ScalaValueReader(inputSplit.partition, options,readOptions);
- hasNext = scalaValueReader.hasNext();
- }
-
- /**
- * Closes all resources used.
- *
- * @throws IOException Indicates that a resource could not be closed.
- */
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public TypeInformation<RowData> getProducedType() {
- return rowDataTypeInfo;
- }
-
- /**
- * Checks whether all data has been read.
- *
- * @return boolean value indication whether all data has been read.
- * @throws IOException
- */
- @Override
- public boolean reachedEnd() throws IOException {
- return !hasNext;
- }
-
- /**
- * Stores the next resultSet row in a tuple.
- *
- * @param reuse row to be reused.
- * @return row containing next {@link RowData}
- * @throws IOException
- */
- @Override
- public RowData nextRecord(RowData reuse) throws IOException {
- if (!hasNext) {
- return null;
- }
- List next = (List)scalaValueReader.next();
- GenericRowData genericRowData = new GenericRowData(next.size());
- for(int i =0;i<next.size();i++){
- genericRowData.setField(i, next.get(i));
- }
- //update hasNext after we've read the record
- hasNext = scalaValueReader.hasNext();
- return genericRowData;
- }
-
- @Override
- public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
- return cachedStatistics;
- }
-
- @Override
- public DorisTableInputSplit[] createInputSplits(int minNumSplits) throws IOException {
- List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
- int splitNum = 0;
- for (PartitionDefinition partition : dorisPartitions) {
- dorisSplits.add(new DorisTableInputSplit(splitNum++,partition));
- }
- LOG.info("DorisTableInputSplit Num:{}",dorisSplits.size());
- return dorisSplits.toArray(new DorisTableInputSplit[0]);
- }
-
- @Override
- public InputSplitAssigner getInputSplitAssigner(DorisTableInputSplit[] inputSplits) {
- return new DefaultInputSplitAssigner(inputSplits);
- }
-
- /**
- * A builder used to set parameters to the output format's configuration in a fluent way.
- *
- * @return builder
- */
- public static Builder builder() {
- return new Builder();
- }
-
- /**
- * Builder for {@link DorisRowDataInputFormat}.
- */
- public static class Builder {
- private DorisOptions.Builder optionsBuilder;
- private List<PartitionDefinition> partitions;
- private DorisReadOptions readOptions;
-
-
- public Builder() {
- this.optionsBuilder = DorisOptions.builder();
- }
-
- public Builder setFenodes(String fenodes) {
- this.optionsBuilder.setFenodes(fenodes);
- return this;
- }
-
- public Builder setUsername(String username) {
- this.optionsBuilder.setUsername(username);
- return this;
- }
-
- public Builder setPassword(String password) {
- this.optionsBuilder.setPassword(password);
- return this;
- }
-
- public Builder setTableIdentifier(String tableIdentifier) {
- this.optionsBuilder.setTableIdentifier(tableIdentifier);
- return this;
- }
-
- public Builder setPartitions(List<PartitionDefinition> partitions) {
- this.partitions = partitions;
- return this;
- }
-
- public Builder setReadOptions(DorisReadOptions readOptions) {
- this.readOptions = readOptions;
- return this;
- }
-
- public DorisRowDataInputFormat build() {
- return new DorisRowDataInputFormat(
- optionsBuilder.build(),partitions,readOptions
- );
- }
- }
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
+
+ private DorisOptions options;
+ private DorisReadOptions readOptions;
+ private List<PartitionDefinition> dorisPartitions;
+ private TypeInformation<RowData> rowDataTypeInfo;
+
+ private ScalaValueReader scalaValueReader;
+ private transient boolean hasNext;
+
+ public DorisRowDataInputFormat(DorisOptions options, List<PartitionDefinition> dorisPartitions, DorisReadOptions readOptions) {
+ this.options = options;
+ this.dorisPartitions = dorisPartitions;
+ this.readOptions = readOptions;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {
+ //do nothing here
+ }
+
+ @Override
+ public void openInputFormat() {
+ //called once per inputFormat (on open)
+ }
+
+ @Override
+ public void closeInputFormat() {
+ //called once per inputFormat (on close)
+ }
+
+ /**
+ * Connects to the source database and executes the query in a <b>parallel
+ * fashion</b> if
+ * this {@link InputFormat} is built using a parameterized query (i.e. using
+ * a {@link PreparedStatement})
+ * and a proper {@link }, in a <b>non-parallel
+ * fashion</b> otherwise.
+ *
+ * @param inputSplit which is ignored if this InputFormat is executed as a
+ * non-parallel source,
+ * a "hook" to the query parameters otherwise (using its
+ * <i>splitNumber</i>)
+ * @throws IOException if there's an error during the execution of the query
+ */
+ @Override
+ public void open(DorisTableInputSplit inputSplit) throws IOException {
+ scalaValueReader = new ScalaValueReader(inputSplit.partition, options, readOptions);
+ hasNext = scalaValueReader.hasNext();
+ }
+
+ /**
+ * Closes all resources used.
+ *
+ * @throws IOException Indicates that a resource could not be closed.
+ */
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return rowDataTypeInfo;
+ }
+
+ /**
+ * Checks whether all data has been read.
+ *
+ * @return boolean value indication whether all data has been read.
+ * @throws IOException
+ */
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return !hasNext;
+ }
+
+ /**
+ * Stores the next resultSet row in a tuple.
+ *
+ * @param reuse row to be reused.
+ * @return row containing next {@link RowData}
+ * @throws IOException
+ */
+ @Override
+ public RowData nextRecord(RowData reuse) throws IOException {
+ if (!hasNext) {
+ return null;
+ }
+ List next = (List) scalaValueReader.next();
+ GenericRowData genericRowData = new GenericRowData(next.size());
+ for (int i = 0; i < next.size(); i++) {
+ genericRowData.setField(i, next.get(i));
+ }
+ //update hasNext after we've read the record
+ hasNext = scalaValueReader.hasNext();
+ return genericRowData;
+ }
+
+ @Override
+ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+ return cachedStatistics;
+ }
+
+ @Override
+ public DorisTableInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+ List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
+ int splitNum = 0;
+ for (PartitionDefinition partition : dorisPartitions) {
+ dorisSplits.add(new DorisTableInputSplit(splitNum++, partition));
+ }
+ LOG.info("DorisTableInputSplit Num:{}", dorisSplits.size());
+ return dorisSplits.toArray(new DorisTableInputSplit[0]);
+ }
+
+ @Override
+ public InputSplitAssigner getInputSplitAssigner(DorisTableInputSplit[] inputSplits) {
+ return new DefaultInputSplitAssigner(inputSplits);
+ }
+
+ /**
+ * A builder used to set parameters to the output format's configuration in a fluent way.
+ *
+ * @return builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link DorisRowDataInputFormat}.
+ */
+ public static class Builder {
+ private DorisOptions.Builder optionsBuilder;
+ private List<PartitionDefinition> partitions;
+ private DorisReadOptions readOptions;
+
+
+ public Builder() {
+ this.optionsBuilder = DorisOptions.builder();
+ }
+
+ public Builder setFenodes(String fenodes) {
+ this.optionsBuilder.setFenodes(fenodes);
+ return this;
+ }
+
+ public Builder setUsername(String username) {
+ this.optionsBuilder.setUsername(username);
+ return this;
+ }
+
+ public Builder setPassword(String password) {
+ this.optionsBuilder.setPassword(password);
+ return this;
+ }
+
+ public Builder setTableIdentifier(String tableIdentifier) {
+ this.optionsBuilder.setTableIdentifier(tableIdentifier);
+ return this;
+ }
+
+ public Builder setPartitions(List<PartitionDefinition> partitions) {
+ this.partitions = partitions;
+ return this;
+ }
+
+ public Builder setReadOptions(DorisReadOptions readOptions) {
+ this.readOptions = readOptions;
+ return this;
+ }
+
+ public DorisRowDataInputFormat build() {
+ return new DorisRowDataInputFormat(
+ optionsBuilder.build(), partitions, readOptions
+ );
+ }
+ }
}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index ef16f33..b9a7708 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -17,6 +17,7 @@
package org.apache.doris.flink.table;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.slf4j.Logger;
@@ -31,17 +32,21 @@ import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
+import java.util.Date;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.UUID;
/**
* DorisStreamLoad
**/
-public class DorisStreamLoad implements Serializable{
+public class DorisStreamLoad implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
@@ -54,8 +59,9 @@ public class DorisStreamLoad implements Serializable{
private String db;
private String tbl;
private String authEncoding;
+ private Properties streamLoadProp;
- public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
+ public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp) {
this.hostPort = hostPort;
this.db = db;
this.tbl = tbl;
@@ -63,6 +69,7 @@ public class DorisStreamLoad implements Serializable{
this.passwd = passwd;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+ this.streamLoadProp = streamLoadProp;
}
public String getLoadUrlStr() {
@@ -89,6 +96,9 @@ public class DorisStreamLoad implements Serializable{
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
+ for (Map.Entry<Object, Object> entry : streamLoadProp.entrySet()) {
+ conn.addRequestProperty(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
+ }
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
@@ -104,6 +114,7 @@ public class DorisStreamLoad implements Serializable{
this.respMsg = respMsg;
this.respContent = respContent;
}
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -116,14 +127,14 @@ public class DorisStreamLoad implements Serializable{
public void load(String value) throws StreamLoadException {
LoadResponse loadResponse = loadBatch(value);
- LOG.info("Streamload Response:{}",loadResponse);
- if(loadResponse.status != 200){
+ LOG.info("Streamload Response:{}", loadResponse);
+ if (loadResponse.status != 200) {
throw new StreamLoadException("stream load error: " + loadResponse.respContent);
- }else{
+ } else {
ObjectMapper obj = new ObjectMapper();
try {
RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
- if(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())){
+ if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
throw new StreamLoadException("stream load error: " + respContent.getMessage());
}
} catch (IOException e) {
@@ -133,11 +144,13 @@ public class DorisStreamLoad implements Serializable{
}
private LoadResponse loadBatch(String value) {
- Calendar calendar = Calendar.getInstance();
- String label = String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s",
- calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
- calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
- UUID.randomUUID().toString().replaceAll("-", ""));
+ String label = streamLoadProp.getProperty("label");
+ if (StringUtils.isBlank(label)) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd_HHmmss");
+ String formatDate = sdf.format(new Date());
+ label = String.format("flink_connector_%s_%s",formatDate,
+ UUID.randomUUID().toString().replaceAll("-", ""));
+ }
HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java
index f245dac..5e81cc1 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java
@@ -22,14 +22,16 @@ import org.apache.flink.core.io.InputSplit;
/**
* DorisTableInputSplit
**/
-public class DorisTableInputSplit implements InputSplit, java.io.Serializable{
+public class DorisTableInputSplit implements InputSplit, java.io.Serializable {
- /** The number of the split. */
+ /**
+ * The number of the split.
+ */
private final int splitNumber;
protected final PartitionDefinition partition;
- public DorisTableInputSplit(int splitNumber,PartitionDefinition partition) {
+ public DorisTableInputSplit(int splitNumber, PartitionDefinition partition) {
super();
this.splitNumber = splitNumber;
this.partition = partition;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org