You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by wa...@apache.org on 2021/01/27 03:17:16 UTC

[incubator-doris] branch master updated: (#5224)some little fix for spark load (#5233)

This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 41ef9cc  (#5224)some little fix for spark load (#5233)
41ef9cc is described below

commit 41ef9ccda977b22c8d9f234869194f2be0462afb
Author: wangbo <50...@qq.com>
AuthorDate: Wed Jan 27 11:16:59 2021 +0800

    (#5224)some little fix for spark load (#5233)
    
    * (#5224)some little fix for spark load
    
    * 1 use yyyy-MM-dd instead of YYYY-MM-DD
    2 unify lower case for bitmap column name
---
 .../apache/doris/load/loadv2/dpp/ColumnParser.java | 12 +++--
 .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 55 ++++++++++++++++++----
 .../apache/doris/load/loadv2/etl/SparkEtlJob.java  |  9 +++-
 3 files changed, 61 insertions(+), 15 deletions(-)

diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
index c9d6a42..1547191 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
@@ -21,18 +21,22 @@ import org.apache.doris.common.SparkDppException;
 import org.apache.doris.load.loadv2.etl.EtlJobConfig;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.util.Date;
 
 // Parser to validate value for different type
 public abstract class ColumnParser implements Serializable {
 
     protected static final Logger LOG = LogManager.getLogger(ColumnParser.class);
 
+    // thread safe formatter
+    public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd");
+    public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
+
     public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws SparkDppException {
         String columnType = etlColumn.columnType;
         if (columnType.equalsIgnoreCase("TINYINT")) {
@@ -158,7 +162,7 @@ class DateParser extends ColumnParser {
     @Override
     public boolean parse(String value) {
         try {
-            Date.parse(value);
+            DATE_FORMATTER.parseDateTime(value);
         } catch (IllegalArgumentException e) {
             return false;
         }
@@ -170,7 +174,7 @@ class DatetimeParser extends ColumnParser {
     @Override
     public boolean parse(String value) {
         try {
-            DateTime.parse(value);
+            DATE_TIME_FORMATTER.parseDateTime(value);
         } catch (IllegalArgumentException e) {
             return false;
         }
diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 2af4de4..b30d102 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -108,13 +108,17 @@ public final class SparkDpp implements java.io.Serializable {
     // we need to wrap it so that we can use it in executor.
     private SerializableConfiguration serializableHadoopConf;
     private DppResult dppResult = new DppResult();
+    Map<Long, Set<String>> tableToBitmapDictColumns = new HashMap<>();
 
     // just for ut
     public SparkDpp() {}
 
-    public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
+    public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig, Map<Long, Set<String>> tableToBitmapDictColumns) {
         this.spark = spark;
         this.etlJobConfig = etlJobConfig;
+        if (tableToBitmapDictColumns != null) {
+            this.tableToBitmapDictColumns = tableToBitmapDictColumns;
+        }
     }
 
     public void init() {
@@ -543,7 +547,9 @@ public final class SparkDpp implements java.io.Serializable {
                 }
             }
             if (column.columnType.equalsIgnoreCase("DATE")) {
-                dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast("date"));
+                dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(DataTypes.DateType));
+            } else if (column.columnType.equalsIgnoreCase("DATETIME")) {
+                dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(DataTypes.TimestampType));
             } else if (column.columnType.equalsIgnoreCase("BOOLEAN")) {
                 dataframe = dataframe.withColumn(dstField.name(),
                         functions.when(functions.lower(dataframe.col(dstField.name())).equalTo("true"), "1")
@@ -844,7 +850,8 @@ public final class SparkDpp implements java.io.Serializable {
                                                String hiveDbTableName,
                                                EtlJobConfig.EtlIndex baseIndex,
                                                EtlJobConfig.EtlFileGroup fileGroup,
-                                               StructType dstTableSchema) throws SparkDppException {
+                                               StructType dstTableSchema,
+                                               Set<String> dictBitmapColumnSet) throws SparkDppException {
         // select base index columns from hive table
         StringBuilder sql = new StringBuilder();
         sql.append("select ");
@@ -857,18 +864,39 @@ public final class SparkDpp implements java.io.Serializable {
         }
 
         Dataset<Row> dataframe = spark.sql(sql.toString());
+        // Note(wb): in current spark load implementation, spark load can't be consistent with doris BE; The reason is as follows
+        // For stream load in doris BE, it runs as follow steps:
+        // step 1: type check
+        // step 2: expression calculation
+        // step 3: strict mode check
+        // step 4: nullable column check
+        // BE can do the four steps row by row
+        // but spark load relies on spark to do step2, so it can only do step 1 for whole dataset and then do step 2 for whole dataset and so on;
+        // So in spark load, we first do step 1,3,4,and then do step 2.
         dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex, fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode,
-                    dstTableSchema);
+                dstTableSchema, dictBitmapColumnSet);
         dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
         return dataframe;
     }
 
     private Dataset<Row> checkDataFromHiveWithStrictMode(
-            Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema) throws SparkDppException {
+            Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema,
+            Set<String> dictBitmapColumnSet) throws SparkDppException {
         List<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new ArrayList<>();
         List<ColumnParser> columnParserArrayList = new ArrayList<>();
         for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
-            if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") &&
+            // note(wb): there are three data source for bitmap column
+            // case 1: global dict; need't check
+            // case 2: bitmap hash function; this func is not supported in spark load now, so ignore it here
+            // case 3: origin value is a integer value; it should be checked use LongParser
+            if (StringUtils.equalsIgnoreCase(column.columnType, "bitmap")) {
+                if (dictBitmapColumnSet.contains(column.columnName.toLowerCase())) {
+                    continue;
+                } else {
+                    columnNameNeedCheckArrayList.add(column);
+                    columnParserArrayList.add(new BigIntParser());
+                }
+            } else if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") &&
                     !StringUtils.equalsIgnoreCase(column.columnType, "char") &&
                     !mappingColKeys.contains(column.columnName)) {
                 columnNameNeedCheckArrayList.add(column);
@@ -879,6 +907,7 @@ public final class SparkDpp implements java.io.Serializable {
         ColumnParser[] columnParserArray = columnParserArrayList.toArray(new ColumnParser[columnParserArrayList.size()]);
         EtlJobConfig.EtlColumn[] columnNameArray = columnNameNeedCheckArrayList.toArray(new EtlJobConfig.EtlColumn[columnNameNeedCheckArrayList.size()]);
 
+        StructType srcSchema = dataframe.schema();
         JavaRDD<Row> result = dataframe.toJavaRDD().flatMap(new FlatMapFunction<Row, Row>() {
             @Override
             public Iterator<Row> call(Row row) throws Exception {
@@ -898,6 +927,11 @@ public final class SparkDpp implements java.io.Serializable {
                         if (isStrictMode) {
                             validRow = false;
                             LOG.warn(String.format("row parsed failed in strict mode, column name %s, src row %s", column.columnName, row.toString()));
+                        // a column parsed failed would be filled null, but if doris column is not allowed null, we should skip this row
+                        } else if (!column.isAllowNull) {
+                            validRow = false;
+                            LOG.warn("column:" + i + " can not be null. row:" + row.toString());
+                            break;
                         } else {
                             columnIndexNeedToRepalceNull.add(fieldIndex);
                         }
@@ -909,7 +943,7 @@ public final class SparkDpp implements java.io.Serializable {
                     if (abnormalRowAcc.value() <= 5) {
                         invalidRows.add(row.toString());
                     }
-                } if (columnIndexNeedToRepalceNull.size() != 0) {
+                } else if (columnIndexNeedToRepalceNull.size() != 0) {
                     Object[] newRow = new Object[row.size()];
                     for (int i = 0; i < row.size(); i++) {
                         if (columnIndexNeedToRepalceNull.contains(i)) {
@@ -926,7 +960,8 @@ public final class SparkDpp implements java.io.Serializable {
             }
         });
 
-        return spark.createDataFrame(result, dstTableSchema);
+        // here we just check data but not do cast, so data type should be same with src schema which is hive table schema
+        return spark.createDataFrame(result, srcSchema);
     }
 
     private void process() throws Exception {
@@ -934,6 +969,7 @@ public final class SparkDpp implements java.io.Serializable {
             for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : etlJobConfig.tables.entrySet()) {
                 Long tableId = entry.getKey();
                 EtlJobConfig.EtlTable etlTable = entry.getValue();
+                Set<String> dictBitmapColumnSet = tableToBitmapDictColumns.getOrDefault(tableId, new HashSet<>());
 
                 // get the base index meta
                 EtlJobConfig.EtlIndex baseIndex = null;
@@ -982,7 +1018,8 @@ public final class SparkDpp implements java.io.Serializable {
                     if (sourceType == EtlJobConfig.SourceType.FILE) {
                         fileGroupDataframe = loadDataFromFilePaths(spark, baseIndex, filePaths, fileGroup, dstTableSchema);
                     } else if (sourceType == EtlJobConfig.SourceType.HIVE) {
-                        fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema);
+                        fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema,
+                                dictBitmapColumnSet);
                     } else {
                         throw new RuntimeException("Unknown source type: " + sourceType.name());
                     }
diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
index 6fe8291..86bae31 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.loadv2.etl;
 
+import org.apache.doris.common.SparkDppException;
 import org.apache.doris.load.loadv2.dpp.GlobalDictBuilder;
 import org.apache.doris.load.loadv2.dpp.SparkDpp;
 import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn;
@@ -53,6 +54,7 @@ public class SparkEtlJob {
 
     private static final String BITMAP_DICT_FUNC = "bitmap_dict";
     private static final String TO_BITMAP_FUNC = "to_bitmap";
+    private static final String BITMAP_HASH = "bitmap_hash";
 
     private String jobConfigFilePath;
     private EtlJobConfig etlJobConfig;
@@ -112,8 +114,11 @@ public class SparkEtlJob {
                     String columnName = mappingEntry.getKey();
                     String exprStr = mappingEntry.getValue().toDescription();
                     String funcName = functions.expr(exprStr).expr().prettyName();
+                    if (funcName.equalsIgnoreCase(BITMAP_HASH)) {
+                        throw new SparkDppException("spark load not support bitmap_hash now");
+                    }
                     if (funcName.equalsIgnoreCase(BITMAP_DICT_FUNC)) {
-                        bitmapDictColumns.add(columnName);
+                        bitmapDictColumns.add(columnName.toLowerCase());
                     } else if (!funcName.equalsIgnoreCase(TO_BITMAP_FUNC)) {
                         newColumnMappings.put(mappingEntry.getKey(), mappingEntry.getValue());
                     }
@@ -137,7 +142,7 @@ public class SparkEtlJob {
     }
 
     private void processDpp() throws Exception {
-        SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig);
+        SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig, tableToBitmapDictColumns);
         sparkDpp.init();
         sparkDpp.doDpp();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org