You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/04/25 05:49:35 UTC

[incubator-doris-flink-connector] branch master updated: fix flink schema and doris schema column mapping (#30)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 73299bf  fix flink schema and doris schema column mapping (#30)
73299bf is described below

commit 73299bf4aa40a98e6e5b64662abb1b587a6ee4bd
Author: wudi <67...@qq.com>
AuthorDate: Mon Apr 25 13:49:30 2022 +0800

    fix flink schema and doris schema column mapping (#30)
    
    fix flink schema and doris schema column mapping
---
 .../main/java/org/apache/doris/flink/cfg/DorisReadOptions.java    | 8 ++++++++
 .../doris/flink/deserialization/converter/DorisRowConverter.java  | 4 ++--
 .../org/apache/doris/flink/table/DorisDynamicTableSource.java     | 7 ++++++-
 3 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index aead8a8..9733bfd 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -104,6 +104,14 @@ public class DorisReadOptions implements Serializable {
         return useOldApi;
     }
 
+    public void setReadFields(String readFields) {
+        this.readFields = readFields;
+    }
+
+    public void setFilterQuery(String filterQuery) {
+        this.filterQuery = filterQuery;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index b84fcdb..ee7a9a7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -50,8 +50,8 @@ public class DorisRowConverter implements Serializable {
      * @param record from rowBatch
      */
     public GenericRowData convert(List record){
-        GenericRowData rowData = new GenericRowData(record.size());
-        for (int i = 0; i < record.size(); i++) {
+        GenericRowData rowData = new GenericRowData(deserializationConverters.length);
+        for (int i = 0; i < deserializationConverters.length ; i++) {
             rowData.setField(i, deserializationConverters[i].deserialize(record.get(i)));
         }
         return rowData;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 0af8ad5..20ce4f6 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -38,11 +38,12 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * The {@link DorisDynamicTableSource} is used during planning.
@@ -76,6 +77,10 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+        readOptions.setReadFields(Arrays.stream(physicalSchema.getFieldNames())
+                .map(item->String.format("`%s`", item.trim().replace("`", "")))
+                .collect(Collectors.joining(", ")));
+
         if (readOptions.getUseOldApi()) {
             List<PartitionDefinition> dorisPartitions;
             try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org