You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/09/07 02:05:59 UTC

[incubator-doris] 04/04: [Spark Load][Bug] Keep the column splitting in spark load consistent with broker load / mini load (#4532)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.13
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit bbefdadc9a9225e96a28940c2477c16904594aae
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Sun Sep 6 20:33:26 2020 +0800

    [Spark Load][Bug] Keep the column splitting in spark load consistent with broker load / mini load (#4532)
---
 .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 23 +++++++++++++++++++++-
 .../apache/doris/load/loadv2/etl/EtlJobConfig.java |  9 +--------
 2 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 6e5a714..fd71add 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -62,6 +62,7 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -553,12 +554,13 @@ public final class SparkDpp implements java.io.Serializable {
         for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
             parsers.add(ColumnParser.create(column));
         }
+        char separator = (char)fileGroup.columnSeparator.getBytes(Charset.forName("UTF-8"))[0];
         // now we first support csv file
         // TODO: support parquet file and orc file
         JavaRDD<Row> rowRDD = sourceDataRdd.flatMap(
                 record -> {
                     scannedRowsAcc.add(1);
-                    String[] attributes = record.split(fileGroup.columnSeparator);
+                    String[] attributes = splitLine(record, separator);
                     List<Row> result = new ArrayList<>();
                     boolean validRow = true;
                     if (attributes.length != columnSize) {
@@ -640,6 +642,25 @@ public final class SparkDpp implements java.io.Serializable {
         return srcSchema;
     }
 
+    // This method is to keep the splitting consistent with broker load / mini load
+    private String[] splitLine(String line, char sep) {
+        if (line == null || line.equals("")) {
+            return new String[0];
+        }
+        int index = 0;
+        int lastIndex = 0;
+        // line-begin char and line-end char are considered to be 'delimeter'
+        List<String> values = new ArrayList<>();
+        for (int i = 0 ; i < line.length(); i++, index++) {
+            if (line.charAt(index) == sep) {
+                values.add(line.substring(lastIndex, index));
+                lastIndex = index + 1;
+            }
+        }
+        values.add(line.substring(lastIndex, index));
+        return values.toArray(new String[0]);
+    }
+
     // partition keys will be parsed into double from json
     // so need to convert it to partition columns' type
     private Object convertPartitionKey(Object srcValue, Class dstClass) throws SparkDppException {
diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
index 9ee4d83..8238aea 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
@@ -503,20 +503,13 @@ public class EtlJobConfig implements Serializable {
             this.filePaths = filePaths;
             this.fileFieldNames = fileFieldNames;
             this.columnsFromPath = columnsFromPath;
+            this.columnSeparator = Strings.isNullOrEmpty(columnSeparator) ? "\t" : columnSeparator;
             this.lineDelimiter = lineDelimiter;
             this.isNegative = isNegative;
             this.fileFormat = fileFormat;
             this.columnMappings = columnMappings;
             this.where = where;
             this.partitions = partitions;
-
-            // Convert some special characters in column separator
-            char sep = Strings.isNullOrEmpty(columnSeparator) ? '\t' : columnSeparator.charAt(0);
-            if (".$|()[]{}^?*+\\".indexOf(sep) != -1) {
-                this.columnSeparator = new String(new char[]{'\\', sep});
-            } else {
-                this.columnSeparator = Character.toString(sep);
-            }
         }
 
         // for data from table


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org