You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/04/25 05:49:12 UTC

[incubator-doris-flink-connector] branch branch-for-flink-before-1.13 updated: [Bug] fix flink schema and doris schema column not match (#29)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch branch-for-flink-before-1.13
in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git


The following commit(s) were added to refs/heads/branch-for-flink-before-1.13 by this push:
     new 6229363  [Bug] fix flink schema and doris schema column not match (#29)
6229363 is described below

commit 6229363bbf873840d009765f50fd275153399ac4
Author: wudi <67...@qq.com>
AuthorDate: Mon Apr 25 13:49:08 2022 +0800

    [Bug] fix flink schema and doris schema column not match (#29)
    
    * fix flink schema and doris schema column not match
---
 .../src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java | 7 +++++++
 .../java/org/apache/doris/flink/table/DorisDynamicTableSource.java | 7 +++++++
 .../java/org/apache/doris/flink/table/DorisRowDataInputFormat.java | 3 +++
 .../src/test/java/org/apache/doris/flink/DorisSourceExample.java   | 1 +
 4 files changed, 18 insertions(+)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 0beb18c..d21ccaa 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -98,6 +98,13 @@ public class DorisReadOptions implements Serializable {
         return deserializeArrowAsync;
     }
 
+    public void setReadFields(String readFields) {
+        this.readFields = readFields;
+    }
+
+    public void setFilterQuery(String filterQuery) {
+        this.filterQuery = filterQuery;
+    }
 
     public static Builder builder() {
         return new Builder();
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 689aa47..37ee11b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -38,7 +38,9 @@ import org.apache.flink.table.types.logical.RowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * The {@link DorisDynamicTableSource} is used during planning.
@@ -70,8 +72,13 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+        readOptions.setReadFields(Arrays.stream(physicalSchema.getFieldNames())
+                .map(item->String.format("`%s`", item.trim().replace("`", "")))
+                .collect(Collectors.joining(", ")));
+
         List<PartitionDefinition> dorisPartitions;
         try {
+            //request doris query plan
             dorisPartitions = RestService.findPartitions(options, readOptions, LOG);
         } catch (DorisException e) {
             throw new RuntimeException("Failed fetch doris partitions");
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index eeb63ba..c353de3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -158,6 +158,9 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable
     }
 
     private Object deserialize(LogicalType type, Object val) {
+        if(val == null){
+            return null;
+        }
         switch (type.getTypeRoot()) {
             case DECIMAL:
                 final DecimalType decimalType = ((DecimalType) type);
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
index 35857dc..0abdf41 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceExample.java
@@ -50,6 +50,7 @@ public class DorisSourceExample {
                         "  'connector' = 'doris',\n" +
                         "  'fenodes' = 'FE_IP:8030',\n" +
                         "  'table.identifier' = 'db.table',\n" +
+                        "  'doris.filter.query' = 'bigint_1=1',\n" +
                         "  'username' = 'root',\n" +
                         "  'password' = ''\n" +
                         ")");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org