You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/04/25 05:49:12 UTC
[incubator-doris-flink-connector] branch branch-for-flink-before-1.13 updated: [Bug] fix flink schema and doris schema column not match (#29)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch branch-for-flink-before-1.13
in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 by this push:
new 6229363 [Bug] fix flink schema and doris schema column not match (#29)
6229363 is described below
commit 6229363bbf873840d009765f50fd275153399ac4
Author: wudi <67...@qq.com>
AuthorDate: Mon Apr 25 13:49:08 2022 +0800
[Bug] fix flink schema and doris schema column not match (#29)
* fix flink schema and doris schema column not match
---
.../src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java | 7 +++++++
.../java/org/apache/doris/flink/table/DorisDynamicTableSource.java | 7 +++++++
.../java/org/apache/doris/flink/table/DorisRowDataInputFormat.java | 3 +++
.../src/test/java/org/apache/doris/flink/DorisSourceExample.java | 1 +
4 files changed, 18 insertions(+)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 0beb18c..d21ccaa 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -98,6 +98,13 @@ public class DorisReadOptions implements Serializable {
return deserializeArrowAsync;
}
+ public void setReadFields(String readFields) {
+ this.readFields = readFields;
+ }
+
+ public void setFilterQuery(String filterQuery) {
+ this.filterQuery = filterQuery;
+ }
public static Builder builder() {
return new Builder();
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 689aa47..37ee11b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -38,7 +38,9 @@ import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
/**
* The {@link DorisDynamicTableSource} is used during planning.
@@ -70,8 +72,13 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+ readOptions.setReadFields(Arrays.stream(physicalSchema.getFieldNames())
+ .map(item->String.format("`%s`", item.trim().replace("`", "")))
+ .collect(Collectors.joining(", ")));
+
List<PartitionDefinition> dorisPartitions;
try {
+ //request doris query plan
dorisPartitions = RestService.findPartitions(options, readOptions, LOG);
} catch (DorisException e) {
throw new RuntimeException("Failed fetch doris partitions");
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index eeb63ba..c353de3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -158,6 +158,9 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
}
private Object deserialize(LogicalType type, Object val) {
+ if(val == null){
+ return null;
+ }
switch (type.getTypeRoot()) {
case DECIMAL:
final DecimalType decimalType = ((DecimalType) type);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
index 35857dc..0abdf41 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
@@ -50,6 +50,7 @@ public class DorisSourceExample {
" 'connector' = 'doris',\n" +
" 'fenodes' = 'FE_IP:8030',\n" +
" 'table.identifier' = 'db.table',\n" +
+ " 'doris.filter.query' = 'bigint_1=1',\n" +
" 'username' = 'root',\n" +
" 'password' = ''\n" +
")");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org