You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by wa...@apache.org on 2021/01/27 03:17:16 UTC
[incubator-doris] branch master updated: (#5224)some little fix for
spark load (#5233)
This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 41ef9cc (#5224)some little fix for spark load (#5233)
41ef9cc is described below
commit 41ef9ccda977b22c8d9f234869194f2be0462afb
Author: wangbo <50...@qq.com>
AuthorDate: Wed Jan 27 11:16:59 2021 +0800
(#5224)some little fix for spark load (#5233)
* (#5224)some little fix for spark load
* 1 use yyyy-MM-dd instead of YYYY-MM-DD
2 unify lower case for bitmap column name
---
.../apache/doris/load/loadv2/dpp/ColumnParser.java | 12 +++--
.../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 55 ++++++++++++++++++----
.../apache/doris/load/loadv2/etl/SparkEtlJob.java | 9 +++-
3 files changed, 61 insertions(+), 15 deletions(-)
diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
index c9d6a42..1547191 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
@@ -21,18 +21,22 @@ import org.apache.doris.common.SparkDppException;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.util.Date;
// Parser to validate value for different type
public abstract class ColumnParser implements Serializable {
protected static final Logger LOG = LogManager.getLogger(ColumnParser.class);
+ // thread safe formatter
+ public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd");
+ public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
+
public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws SparkDppException {
String columnType = etlColumn.columnType;
if (columnType.equalsIgnoreCase("TINYINT")) {
@@ -158,7 +162,7 @@ class DateParser extends ColumnParser {
@Override
public boolean parse(String value) {
try {
- Date.parse(value);
+ DATE_FORMATTER.parseDateTime(value);
} catch (IllegalArgumentException e) {
return false;
}
@@ -170,7 +174,7 @@ class DatetimeParser extends ColumnParser {
@Override
public boolean parse(String value) {
try {
- DateTime.parse(value);
+ DATE_TIME_FORMATTER.parseDateTime(value);
} catch (IllegalArgumentException e) {
return false;
}
diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 2af4de4..b30d102 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -108,13 +108,17 @@ public final class SparkDpp implements java.io.Serializable {
// we need to wrap it so that we can use it in executor.
private SerializableConfiguration serializableHadoopConf;
private DppResult dppResult = new DppResult();
+ Map<Long, Set<String>> tableToBitmapDictColumns = new HashMap<>();
// just for ut
public SparkDpp() {}
- public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
+ public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig, Map<Long, Set<String>> tableToBitmapDictColumns) {
this.spark = spark;
this.etlJobConfig = etlJobConfig;
+ if (tableToBitmapDictColumns != null) {
+ this.tableToBitmapDictColumns = tableToBitmapDictColumns;
+ }
}
public void init() {
@@ -543,7 +547,9 @@ public final class SparkDpp implements java.io.Serializable {
}
}
if (column.columnType.equalsIgnoreCase("DATE")) {
- dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast("date"));
+ dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(DataTypes.DateType));
+ } else if (column.columnType.equalsIgnoreCase("DATETIME")) {
+ dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(DataTypes.TimestampType));
} else if (column.columnType.equalsIgnoreCase("BOOLEAN")) {
dataframe = dataframe.withColumn(dstField.name(),
functions.when(functions.lower(dataframe.col(dstField.name())).equalTo("true"), "1")
@@ -844,7 +850,8 @@ public final class SparkDpp implements java.io.Serializable {
String hiveDbTableName,
EtlJobConfig.EtlIndex baseIndex,
EtlJobConfig.EtlFileGroup fileGroup,
- StructType dstTableSchema) throws SparkDppException {
+ StructType dstTableSchema,
+ Set<String> dictBitmapColumnSet) throws SparkDppException {
// select base index columns from hive table
StringBuilder sql = new StringBuilder();
sql.append("select ");
@@ -857,18 +864,39 @@ public final class SparkDpp implements java.io.Serializable {
}
Dataset<Row> dataframe = spark.sql(sql.toString());
+ // Note(wb): in current spark load implementation, spark load can't be consistent with doris BE; The reason is as follows
+ // For stream load in doris BE, it runs as follow steps:
+ // step 1: type check
+ // step 2: expression calculation
+ // step 3: strict mode check
+ // step 4: nullable column check
+ // BE can do the four steps row by row
+ // but spark load relies on spark to do step2, so it can only do step 1 for whole dataset and then do step 2 for whole dataset and so on;
+ // So in spark load, we first do step 1,3,4,and then do step 2.
dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex, fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode,
- dstTableSchema);
+ dstTableSchema, dictBitmapColumnSet);
dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
return dataframe;
}
private Dataset<Row> checkDataFromHiveWithStrictMode(
- Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema) throws SparkDppException {
+ Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema,
+ Set<String> dictBitmapColumnSet) throws SparkDppException {
List<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new ArrayList<>();
List<ColumnParser> columnParserArrayList = new ArrayList<>();
for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
- if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") &&
+ // note(wb): there are three data source for bitmap column
+ // case 1: global dict; need't check
+ // case 2: bitmap hash function; this func is not supported in spark load now, so ignore it here
+ // case 3: origin value is a integer value; it should be checked use LongParser
+ if (StringUtils.equalsIgnoreCase(column.columnType, "bitmap")) {
+ if (dictBitmapColumnSet.contains(column.columnName.toLowerCase())) {
+ continue;
+ } else {
+ columnNameNeedCheckArrayList.add(column);
+ columnParserArrayList.add(new BigIntParser());
+ }
+ } else if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") &&
!StringUtils.equalsIgnoreCase(column.columnType, "char") &&
!mappingColKeys.contains(column.columnName)) {
columnNameNeedCheckArrayList.add(column);
@@ -879,6 +907,7 @@ public final class SparkDpp implements java.io.Serializable {
ColumnParser[] columnParserArray = columnParserArrayList.toArray(new ColumnParser[columnParserArrayList.size()]);
EtlJobConfig.EtlColumn[] columnNameArray = columnNameNeedCheckArrayList.toArray(new EtlJobConfig.EtlColumn[columnNameNeedCheckArrayList.size()]);
+ StructType srcSchema = dataframe.schema();
JavaRDD<Row> result = dataframe.toJavaRDD().flatMap(new FlatMapFunction<Row, Row>() {
@Override
public Iterator<Row> call(Row row) throws Exception {
@@ -898,6 +927,11 @@ public final class SparkDpp implements java.io.Serializable {
if (isStrictMode) {
validRow = false;
LOG.warn(String.format("row parsed failed in strict mode, column name %s, src row %s", column.columnName, row.toString()));
+ // a column parsed failed would be filled null, but if doris column is not allowed null, we should skip this row
+ } else if (!column.isAllowNull) {
+ validRow = false;
+ LOG.warn("column:" + i + " can not be null. row:" + row.toString());
+ break;
} else {
columnIndexNeedToRepalceNull.add(fieldIndex);
}
@@ -909,7 +943,7 @@ public final class SparkDpp implements java.io.Serializable {
if (abnormalRowAcc.value() <= 5) {
invalidRows.add(row.toString());
}
- } if (columnIndexNeedToRepalceNull.size() != 0) {
+ } else if (columnIndexNeedToRepalceNull.size() != 0) {
Object[] newRow = new Object[row.size()];
for (int i = 0; i < row.size(); i++) {
if (columnIndexNeedToRepalceNull.contains(i)) {
@@ -926,7 +960,8 @@ public final class SparkDpp implements java.io.Serializable {
}
});
- return spark.createDataFrame(result, dstTableSchema);
+ // here we just check data but not do cast, so data type should be same with src schema which is hive table schema
+ return spark.createDataFrame(result, srcSchema);
}
private void process() throws Exception {
@@ -934,6 +969,7 @@ public final class SparkDpp implements java.io.Serializable {
for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : etlJobConfig.tables.entrySet()) {
Long tableId = entry.getKey();
EtlJobConfig.EtlTable etlTable = entry.getValue();
+ Set<String> dictBitmapColumnSet = tableToBitmapDictColumns.getOrDefault(tableId, new HashSet<>());
// get the base index meta
EtlJobConfig.EtlIndex baseIndex = null;
@@ -982,7 +1018,8 @@ public final class SparkDpp implements java.io.Serializable {
if (sourceType == EtlJobConfig.SourceType.FILE) {
fileGroupDataframe = loadDataFromFilePaths(spark, baseIndex, filePaths, fileGroup, dstTableSchema);
} else if (sourceType == EtlJobConfig.SourceType.HIVE) {
- fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema);
+ fileGroupDataframe = loadDataFromHiveTable(spark, fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema,
+ dictBitmapColumnSet);
} else {
throw new RuntimeException("Unknown source type: " + sourceType.name());
}
diff --git a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
index 6fe8291..86bae31 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/SparkEtlJob.java
@@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2.etl;
+import org.apache.doris.common.SparkDppException;
import org.apache.doris.load.loadv2.dpp.GlobalDictBuilder;
import org.apache.doris.load.loadv2.dpp.SparkDpp;
import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn;
@@ -53,6 +54,7 @@ public class SparkEtlJob {
private static final String BITMAP_DICT_FUNC = "bitmap_dict";
private static final String TO_BITMAP_FUNC = "to_bitmap";
+ private static final String BITMAP_HASH = "bitmap_hash";
private String jobConfigFilePath;
private EtlJobConfig etlJobConfig;
@@ -112,8 +114,11 @@ public class SparkEtlJob {
String columnName = mappingEntry.getKey();
String exprStr = mappingEntry.getValue().toDescription();
String funcName = functions.expr(exprStr).expr().prettyName();
+ if (funcName.equalsIgnoreCase(BITMAP_HASH)) {
+ throw new SparkDppException("spark load not support bitmap_hash now");
+ }
if (funcName.equalsIgnoreCase(BITMAP_DICT_FUNC)) {
- bitmapDictColumns.add(columnName);
+ bitmapDictColumns.add(columnName.toLowerCase());
} else if (!funcName.equalsIgnoreCase(TO_BITMAP_FUNC)) {
newColumnMappings.put(mappingEntry.getKey(), mappingEntry.getValue());
}
@@ -137,7 +142,7 @@ public class SparkEtlJob {
}
private void processDpp() throws Exception {
- SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig);
+ SparkDpp sparkDpp = new SparkDpp(spark, etlJobConfig, tableToBitmapDictColumns);
sparkDpp.init();
sparkDpp.doDpp();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org