You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/04/25 05:49:35 UTC
[incubator-doris-flink-connector] branch master updated: fix flink schema and doris schema column mapping (#30)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 73299bf fix flink schema and doris schema column mapping (#30)
73299bf is described below
commit 73299bf4aa40a98e6e5b64662abb1b587a6ee4bd
Author: wudi <67...@qq.com>
AuthorDate: Mon Apr 25 13:49:30 2022 +0800
fix flink schema and doris schema column mapping (#30)
fix flink schema and doris schema column mapping
---
.../main/java/org/apache/doris/flink/cfg/DorisReadOptions.java | 8 ++++++++
.../doris/flink/deserialization/converter/DorisRowConverter.java | 4 ++--
.../org/apache/doris/flink/table/DorisDynamicTableSource.java | 7 ++++++-
3 files changed, 16 insertions(+), 3 deletions(-)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index aead8a8..9733bfd 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -104,6 +104,14 @@ public class DorisReadOptions implements Serializable {
return useOldApi;
}
+ public void setReadFields(String readFields) {
+ this.readFields = readFields;
+ }
+
+ public void setFilterQuery(String filterQuery) {
+ this.filterQuery = filterQuery;
+ }
+
public static Builder builder() {
return new Builder();
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index b84fcdb..ee7a9a7 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -50,8 +50,8 @@ public class DorisRowConverter implements Serializable {
* @param record from rowBatch
*/
public GenericRowData convert(List record){
- GenericRowData rowData = new GenericRowData(record.size());
- for (int i = 0; i < record.size(); i++) {
+ GenericRowData rowData = new GenericRowData(deserializationConverters.length);
+ for (int i = 0; i < deserializationConverters.length ; i++) {
rowData.setField(i, deserializationConverters[i].deserialize(record.get(i)));
}
return rowData;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 0af8ad5..20ce4f6 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -38,11 +38,12 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
/**
* The {@link DorisDynamicTableSource} is used during planning.
@@ -76,6 +77,10 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+ readOptions.setReadFields(Arrays.stream(physicalSchema.getFieldNames())
+ .map(item->String.format("`%s`", item.trim().replace("`", "")))
+ .collect(Collectors.joining(", ")));
+
if (readOptions.getUseOldApi()) {
List<PartitionDefinition> dorisPartitions;
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org