You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2021/01/13 13:29:43 UTC

[GitHub] [incubator-doris] wangbo opened a new pull request #5233: (#5224)some little fix for spark load

wangbo opened a new pull request #5233:
URL: https://github.com/apache/incubator-doris/pull/5233


   ## Proposed changes
   
   Some little fix for spark load
   
   ## Types of changes
   
   What types of changes does your code introduce to Doris?
   _Put an `x` in the boxes that apply_
   
   - [x] Bugfix (non-breaking change which fixes an issue)
   - [] New feature (non-breaking change which adds functionality)
   - [] Breaking change (fix or feature that would cause existing functionality to not work as expected)
   - [] Documentation Update (if none of the other choices apply)
   - [] Code refactor (Modify the code structure, format the code, etc...)
   
   ## Checklist
   
   _Put an `x` in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code._
   
   - [x] I have create an issue on (Fix #5224), and have described the bug/feature there in detail
   - [] Compiling and unit tests pass locally with my changes
   - [] I have added tests that prove my fix is effective or that my feature works
   - [] If this change need a document change, I have updated the document
   - [] Any dependent changes have been merged
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo commented on a change in pull request #5233: (#5224)some little fix for spark load

Posted by GitBox <gi...@apache.org>.
wangbo commented on a change in pull request #5233:
URL: https://github.com/apache/incubator-doris/pull/5233#discussion_r559574452



##########
File path: fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
##########
@@ -857,18 +864,39 @@ private Object convertPartitionKey(Object srcValue, Class dstClass) throws Spark
         }
 
         Dataset<Row> dataframe = spark.sql(sql.toString());
+        // Note(wb): in current spark load implementation, spark load can't be consistent with doris BE; The reason is as follows
+        // For stream load in doris BE, it runs as follow steps:
+        // step 1: type check
+        // step 2: expression calculation
+        // step 3: strict mode check
+        // step 4: nullable column check
+        // BE can do the four steps row by row
+        // but spark load relies on spark to do step2, so it can only do step 1 for whole dataset and then do step 2 for whole dataset and so on;
+        // So in spark load, we first do step 1,3,4,and then do step 2.
         dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex, fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode,
-                    dstTableSchema);
+                dstTableSchema, dictBitmapColumnSet);
         dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
         return dataframe;
     }
 
     private Dataset<Row> checkDataFromHiveWithStrictMode(
-            Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema) throws SparkDppException {
+            Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema,
+            Set<String> dictBitmapColumnSet) throws SparkDppException {
         List<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new ArrayList<>();
         List<ColumnParser> columnParserArrayList = new ArrayList<>();
         for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
-            if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") &&
+            // note(wb): there are three data source for bitmap column
+            // case 1: global dict; need't check
+            // case 2: bitmap hash function; this func is not supported in spark load now, so ignore it here
+            // case 3: origin value is a integer value; it should be checked use LongParser
+            if (StringUtils.equalsIgnoreCase(column.columnType, "bitmap")) {
+                if (dictBitmapColumnSet.contains(column.columnName)) {

Review comment:
       👌




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wyb commented on a change in pull request #5233: (#5224)some little fix for spark load

Posted by GitBox <gi...@apache.org>.
wyb commented on a change in pull request #5233:
URL: https://github.com/apache/incubator-doris/pull/5233#discussion_r557872981



##########
File path: fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
##########
@@ -857,18 +864,39 @@ private Object convertPartitionKey(Object srcValue, Class dstClass) throws Spark
         }
 
         Dataset<Row> dataframe = spark.sql(sql.toString());
+        // Note(wb): in current spark load implementation, spark load can't be consistent with doris BE; The reason is as follows
+        // For stream load in doris BE, it runs as follow steps:
+        // step 1: type check
+        // step 2: expression calculation
+        // step 3: strict mode check
+        // step 4: nullable column check
+        // BE can do the four steps row by row
+        // but spark load relies on spark to do step2, so it can only do step 1 for whole dataset and then do step 2 for whole dataset and so on;
+        // So in spark load, we first do step 1,3,4,and then do step 2.
         dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex, fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode,
-                    dstTableSchema);
+                dstTableSchema, dictBitmapColumnSet);
         dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
         return dataframe;
     }
 
     private Dataset<Row> checkDataFromHiveWithStrictMode(
-            Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema) throws SparkDppException {
+            Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema,
+            Set<String> dictBitmapColumnSet) throws SparkDppException {
         List<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new ArrayList<>();
         List<ColumnParser> columnParserArrayList = new ArrayList<>();
         for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
-            if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") &&
+            // note(wb): there are three data source for bitmap column
+            // case 1: global dict; need't check
+            // case 2: bitmap hash function; this func is not supported in spark load now, so ignore it here
+            // case 3: origin value is a integer value; it should be checked use LongParser
+            if (StringUtils.equalsIgnoreCase(column.columnType, "bitmap")) {
+                if (dictBitmapColumnSet.contains(column.columnName)) {

Review comment:
       column name tolowercase




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] caiconghui commented on a change in pull request #5233: (#5224)some little fix for spark load

Posted by GitBox <gi...@apache.org>.
caiconghui commented on a change in pull request #5233:
URL: https://github.com/apache/incubator-doris/pull/5233#discussion_r557012967



##########
File path: fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
##########
@@ -21,18 +21,22 @@
 import org.apache.doris.load.loadv2.etl.EtlJobConfig;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.util.Date;
 
 // Parser to validate value for different type
 public abstract class ColumnParser implements Serializable {
 
     protected static final Logger LOG = LogManager.getLogger(ColumnParser.class);
 
+    // thread safe formatter
+    public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("YYYY-MM-DD");

Review comment:
       ```suggestion
       public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-DD");
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo commented on a change in pull request #5233: (#5224)some little fix for spark load

Posted by GitBox <gi...@apache.org>.
wangbo commented on a change in pull request #5233:
URL: https://github.com/apache/incubator-doris/pull/5233#discussion_r559574152



##########
File path: fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
##########
@@ -21,18 +21,22 @@
 import org.apache.doris.load.loadv2.etl.EtlJobConfig;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.util.Date;
 
 // Parser to validate value for different type
 public abstract class ColumnParser implements Serializable {
 
     protected static final Logger LOG = LogManager.getLogger(ColumnParser.class);
 
+    // thread safe formatter
+    public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("YYYY-MM-DD");

Review comment:
       Why "yyyy-MM-DD" is suggested?
   Is there any difference between them?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo merged pull request #5233: (#5224)some little fix for spark load

Posted by GitBox <gi...@apache.org>.
wangbo merged pull request #5233:
URL: https://github.com/apache/incubator-doris/pull/5233


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] caiconghui commented on a change in pull request #5233: (#5224)some little fix for spark load

Posted by GitBox <gi...@apache.org>.
caiconghui commented on a change in pull request #5233:
URL: https://github.com/apache/incubator-doris/pull/5233#discussion_r559653537



##########
File path: fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
##########
@@ -21,18 +21,22 @@
 import org.apache.doris.load.loadv2.etl.EtlJobConfig;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.util.Date;
 
 // Parser to validate value for different type
 public abstract class ColumnParser implements Serializable {
 
     protected static final Logger LOG = LogManager.getLogger(ColumnParser.class);
 
+    // thread safe formatter
+    public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("YYYY-MM-DD");

Review comment:
       @wangbo sorry, it should be 'yyyy-MM-dd'. you can see https://zhuanlan.zhihu.com/p/101150248




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo commented on pull request #5233: (#5224)some little fix for spark load

Posted by GitBox <gi...@apache.org>.
wangbo commented on pull request #5233:
URL: https://github.com/apache/incubator-doris/pull/5233#issuecomment-767362178


   @morningman please review this pr


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo commented on a change in pull request #5233: (#5224)some little fix for spark load

Posted by GitBox <gi...@apache.org>.
wangbo commented on a change in pull request #5233:
URL: https://github.com/apache/incubator-doris/pull/5233#discussion_r559883191



##########
File path: fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
##########
@@ -21,18 +21,22 @@
 import org.apache.doris.load.loadv2.etl.EtlJobConfig;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.util.Date;
 
 // Parser to validate value for different type
 public abstract class ColumnParser implements Serializable {
 
     protected static final Logger LOG = LogManager.getLogger(ColumnParser.class);
 
+    // thread safe formatter
+    public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("YYYY-MM-DD");

Review comment:
       Thanks for sharing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org