You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/03/09 11:31:24 UTC

[GitHub] [incubator-doris] wangbo opened a new pull request #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict

wangbo opened a new pull request #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict
URL: https://github.com/apache/incubator-doris/pull/3063
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict
URL: https://github.com/apache/incubator-doris/pull/3063#discussion_r392808729
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/loadv2/BuildGlobalDict.java
 ##########
 @@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Column;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ *  used for build hive global dict and encode source hive table
+ *
+ *  input: a source hive table
+ *  output: a intermediate hive table whose distinct column is encode with int value
+ *
+ *  usage example
+ *  step1,create a intermediate hive table
+ *      BuildGlobalDict.createHiveIntermediateTable()
+ *  step2, get distinct column's value
+ *      BuildGlobalDict.extractDistinctColumn()
+ *  step3, build global dict
+ *      BuildGlobalDict.buildGlobalDict()
+ *  step4, encode intermediate hive table with global dict
+ *      BuildGlobalDict.encodeDorisIntermediateHiveTable()
+ */
+
+public class BuildGlobalDict {
 
 Review comment:
   How about `GlobalDictBuilder`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangkaisen commented on issue #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict

Posted by GitBox <gi...@apache.org>.
kangkaisen commented on issue #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict
URL: https://github.com/apache/incubator-doris/pull/3063#issuecomment-614732854
 
 
   @wangbo Hi, please update this PR when you have time.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on issue #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict

Posted by GitBox <gi...@apache.org>.
morningman commented on issue #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict
URL: https://github.com/apache/incubator-doris/pull/3063#issuecomment-599387357
 
 
   Drow a flow, just for review.
   
   ```
                                                  +-----+
                                                  |Doris|
                                                  +--^--+
                                                     |
                                    5. Load to Doris |      2. For each distinct column,
                                                     |         Create a GroupBy table
                                                     |
                                                     |                    +------------+
                                                     |     +-------------->GroupBy Tbl1|
                       1. Extract/Transferm          |     |              +------------+
   +-------------------+                    +--------+-----+---+          +------------+
   | Source Data Table +-------------------->intermediate Table+---------->GroupBy Tbl2|
   +-------------------+                    +--------^-----+---+          +------------+
                                                     |     |              +------------+
                                                     |     +-------------->GroupBy Tbl3|
                                                     |                    +-----+------+
                     4. Use updated GlobalDictTable  |                          |
                     to update intermediate table.   |                          | 3. Use GroupBy table to
                     (change distinct column's value |                          | update GlobalDict Table
                     to encoded integer)             |                          |
                                                     |                  +-------v-------+
                                                     +------------------+GlobalDictTable|
                                                                        +---------------+
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman edited a comment on issue #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict

Posted by GitBox <gi...@apache.org>.
morningman edited a comment on issue #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict
URL: https://github.com/apache/incubator-doris/pull/3063#issuecomment-599387357
 
 
   Draw a flow, just for review.
   
   ```
                                                  +-----+
                                                  |Doris|
                                                  +--^--+
                                                     |
                                    5. Load to Doris |      2. For each distinct column,
                                                     |         Create a GroupBy table
                                                     |
                                                     |                    +------------+
                                                     |     +-------------->GroupBy Tbl1|
                       1. Extract/Transferm          |     |              +------------+
   +-------------------+                    +--------+-----+---+          +------------+
   | Source Data Table +-------------------->intermediate Table+---------->GroupBy Tbl2|
   +-------------------+                    +--------^-----+---+          +------------+
                                                     |     |              +------------+
                                                     |     +-------------->GroupBy Tbl3|
                                                     |                    +-----+------+
                     4. Use updated GlobalDictTable  |                          |
                     to update intermediate table.   |                          | 3. Use GroupBy table to
                     (change distinct column's value |                          | update GlobalDict Table
                     to encoded integer)             |                          |
                                                     |                  +-------v-------+
                                                     +------------------+GlobalDictTable|
                                                                        +---------------+
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo commented on a change in pull request #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict

Posted by GitBox <gi...@apache.org>.
wangbo commented on a change in pull request #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict
URL: https://github.com/apache/incubator-doris/pull/3063#discussion_r398344270
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/loadv2/BuildGlobalDict.java
 ##########
 @@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Column;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ *  used for build hive global dict and encode source hive table
+ *
+ *  input: a source hive table
+ *  output: a intermediate hive table whose distinct column is encode with int value
+ *
+ *  usage example
+ *  step1,create a intermediate hive table
+ *      BuildGlobalDict.createHiveIntermediateTable()
+ *  step2, get distinct column's value
+ *      BuildGlobalDict.extractDistinctColumn()
+ *  step3, build global dict
+ *      BuildGlobalDict.buildGlobalDict()
+ *  step4, encode intermediate hive table with global dict
+ *      BuildGlobalDict.encodeDorisIntermediateHiveTable()
+ */
+
+public class BuildGlobalDict {
 
 Review comment:
   sounds great

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wyb commented on a change in pull request #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict

Posted by GitBox <gi...@apache.org>.
wyb commented on a change in pull request #3063:
URL: https://github.com/apache/incubator-doris/pull/3063#discussion_r439381355



##########
File path: fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java
##########
@@ -0,0 +1,413 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Column;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ *  used for build hive global dict and encode source hive table
+ *
+ *  input: a source hive table
+ *  output: a intermediate hive table whose distinct column is encode with int value
+ *
+ *  usage example
+ *  step1,create a intermediate hive table
+ *      BuildGlobalDict.createHiveIntermediateTable()
+ *  step2, get distinct column's value
+ *      BuildGlobalDict.extractDistinctColumn()
+ *  step3, build global dict
+ *      BuildGlobalDict.buildGlobalDict()
+ *  step4, encode intermediate hive table with global dict
+ *      BuildGlobalDict.encodeDorisIntermediateHiveTable()
+ */
+
+public class GlobalDictBuilder {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(GlobalDictBuilder.class);
+
+    // name of the column in doris table which need to build global dict
+    // for example: some dict columns a,b,c
+    // case 1: all dict columns has no relation, then the map is as below
+    //     [a=null, b=null, c=null]
+    // case 2: column a's value can reuse column b's value which means column a's value is a subset of column b's value
+    //  [b=a,c=null]
+    private MultiValueMap dictColumn;
+    // target doris table columns in current spark load job
+    private List<String> dorisOlapTableColumnList;
+
+    // distinct columns which need to use map join to solve data skew in encodeDorisIntermediateHiveTable()
+    // we needn't to specify it until data skew happends
+    private List<String> mapSideJoinColumns;
+
+    // hive table datasource,format is db.table
+    private String sourceHiveDBTableName;
+    // user-specified filter when query sourceHiveDBTable
+    private String sourceHiveFilter;
+    // intermediate hive table to store the distinct value of distinct column
+    private String distinctKeyTableName;
+    // current doris table's global dict hive table
+    private String globalDictTableName;
+
+    // used for next step to read
+    private String dorisIntermediateHiveTable;
+    private SparkSession spark;
+
+    // key=doris column name,value=column type
+    private Map<String, String> dorisColumnNameTypeMap = new HashMap<>();
+
+    // column in this list means need split distinct value and then encode respectively
+    // to avoid the performance bottleneck to transfer origin value to dict value
+    private List<String> veryHighCardinalityColumn;
+    // determine the split num of new distinct value,better can be divisible by 1
+    private int veryHighCardinalityColumnSplitNum;
+
+    private ExecutorService pool;
+
+    private StructType distinctValueSchema;
+
+    public GlobalDictBuilder(MultiValueMap dictColumn,
+                             List<String> dorisOlapTableColumnList,
+                             List<String> mapSideJoinColumns,
+                             String sourceHiveDBTableName,
+                             String sourceHiveFilter,
+                             String dorisHiveDB,
+                             String distinctKeyTableName,
+                             String globalDictTableName,
+                             String dorisIntermediateHiveTable,
+                             int buildConcurrency,
+                             List<String> veryHighCardinalityColumn,
+                             int veryHighCardinalityColumnSplitNum,
+                             SparkSession spark) {
+        this.dictColumn = dictColumn;
+        this.dorisOlapTableColumnList = dorisOlapTableColumnList;
+        this.mapSideJoinColumns = mapSideJoinColumns;
+        this.sourceHiveDBTableName = sourceHiveDBTableName;
+        this.sourceHiveFilter = sourceHiveFilter;
+        this.distinctKeyTableName = distinctKeyTableName;
+        this.globalDictTableName = globalDictTableName;
+        this.dorisIntermediateHiveTable = dorisIntermediateHiveTable;
+        this.spark = spark;
+        this.pool = Executors.newFixedThreadPool(buildConcurrency < 0 ? 1 : buildConcurrency);
+        this.veryHighCardinalityColumn = veryHighCardinalityColumn;
+        this.veryHighCardinalityColumnSplitNum = veryHighCardinalityColumnSplitNum;
+
+        spark.sql("use " + dorisHiveDB);
+    }
+
+    public void createHiveIntermediateTable() throws AnalysisException {
+        Map<String, String> sourceHiveTableColumn = spark.catalog()
+                .listColumns(sourceHiveDBTableName)
+                .collectAsList()
+                .stream().collect(Collectors.toMap(Column::name, Column::dataType));
+
+        Map<String, String> sourceHiveTableColumnInLowercase = new HashMap<>();
+        for (Map.Entry<String, String> entry : sourceHiveTableColumn.entrySet()) {
+            sourceHiveTableColumnInLowercase.put(entry.getKey().toLowerCase(), entry.getValue().toLowerCase());
+        }
+
+        // check and get doris column type in hive
+        dorisOlapTableColumnList.stream().forEach(columnName -> {
+            String columnType = sourceHiveTableColumnInLowercase.get(columnName);

Review comment:
       if column name of doris table is upper, this will be wrong.
   Maybe you can use TreeMap for source hive table columns.

##########
File path: fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java
##########
@@ -0,0 +1,413 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Column;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ *  used for build hive global dict and encode source hive table
+ *
+ *  input: a source hive table
+ *  output: a intermediate hive table whose distinct column is encode with int value
+ *
+ *  usage example
+ *  step1,create a intermediate hive table
+ *      BuildGlobalDict.createHiveIntermediateTable()
+ *  step2, get distinct column's value
+ *      BuildGlobalDict.extractDistinctColumn()
+ *  step3, build global dict
+ *      BuildGlobalDict.buildGlobalDict()
+ *  step4, encode intermediate hive table with global dict
+ *      BuildGlobalDict.encodeDorisIntermediateHiveTable()
+ */
+
+public class GlobalDictBuilder {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(GlobalDictBuilder.class);
+
+    // name of the column in doris table which need to build global dict
+    // for example: some dict columns a,b,c
+    // case 1: all dict columns has no relation, then the map is as below
+    //     [a=null, b=null, c=null]
+    // case 2: column a's value can reuse column b's value which means column a's value is a subset of column b's value
+    //  [b=a,c=null]
+    private MultiValueMap dictColumn;
+    // target doris table columns in current spark load job
+    private List<String> dorisOlapTableColumnList;
+
+    // distinct columns which need to use map join to solve data skew in encodeDorisIntermediateHiveTable()
+    // we needn't to specify it until data skew happends
+    private List<String> mapSideJoinColumns;
+
+    // hive table datasource,format is db.table
+    private String sourceHiveDBTableName;
+    // user-specified filter when query sourceHiveDBTable
+    private String sourceHiveFilter;
+    // intermediate hive table to store the distinct value of distinct column
+    private String distinctKeyTableName;
+    // current doris table's global dict hive table
+    private String globalDictTableName;
+
+    // used for next step to read
+    private String dorisIntermediateHiveTable;
+    private SparkSession spark;
+
+    // key=doris column name,value=column type
+    private Map<String, String> dorisColumnNameTypeMap = new HashMap<>();
+
+    // column in this list means need split distinct value and then encode respectively
+    // to avoid the performance bottleneck to transfer origin value to dict value
+    private List<String> veryHighCardinalityColumn;
+    // determine the split num of new distinct value,better can be divisible by 1
+    private int veryHighCardinalityColumnSplitNum;
+
+    private ExecutorService pool;
+
+    private StructType distinctValueSchema;
+
+    public GlobalDictBuilder(MultiValueMap dictColumn,
+                             List<String> dorisOlapTableColumnList,
+                             List<String> mapSideJoinColumns,
+                             String sourceHiveDBTableName,
+                             String sourceHiveFilter,
+                             String dorisHiveDB,
+                             String distinctKeyTableName,
+                             String globalDictTableName,
+                             String dorisIntermediateHiveTable,
+                             int buildConcurrency,
+                             List<String> veryHighCardinalityColumn,
+                             int veryHighCardinalityColumnSplitNum,
+                             SparkSession spark) {
+        this.dictColumn = dictColumn;
+        this.dorisOlapTableColumnList = dorisOlapTableColumnList;
+        this.mapSideJoinColumns = mapSideJoinColumns;
+        this.sourceHiveDBTableName = sourceHiveDBTableName;
+        this.sourceHiveFilter = sourceHiveFilter;
+        this.distinctKeyTableName = distinctKeyTableName;
+        this.globalDictTableName = globalDictTableName;
+        this.dorisIntermediateHiveTable = dorisIntermediateHiveTable;
+        this.spark = spark;
+        this.pool = Executors.newFixedThreadPool(buildConcurrency < 0 ? 1 : buildConcurrency);
+        this.veryHighCardinalityColumn = veryHighCardinalityColumn;
+        this.veryHighCardinalityColumnSplitNum = veryHighCardinalityColumnSplitNum;
+
+        spark.sql("use " + dorisHiveDB);
+    }
+
+    public void createHiveIntermediateTable() throws AnalysisException {
+        Map<String, String> sourceHiveTableColumn = spark.catalog()
+                .listColumns(sourceHiveDBTableName)
+                .collectAsList()
+                .stream().collect(Collectors.toMap(Column::name, Column::dataType));
+
+        Map<String, String> sourceHiveTableColumnInLowercase = new HashMap<>();
+        for (Map.Entry<String, String> entry : sourceHiveTableColumn.entrySet()) {
+            sourceHiveTableColumnInLowercase.put(entry.getKey().toLowerCase(), entry.getValue().toLowerCase());
+        }
+
+        // check and get doris column type in hive
+        dorisOlapTableColumnList.stream().forEach(columnName -> {
+            String columnType = sourceHiveTableColumnInLowercase.get(columnName);
+            if (StringUtils.isEmpty(columnType)) {
+                throw new RuntimeException(String.format("doris column %s not in source hive table", columnName));
+            }
+            dorisColumnNameTypeMap.put(columnName, columnType);
+        });
+
+        spark.sql(String.format("drop table if exists %s ", dorisIntermediateHiveTable));
+        // create IntermediateHiveTable
+        spark.sql(getCreateIntermediateHiveTableSql());
+
+        // insert data to IntermediateHiveTable
+        spark.sql(getInsertIntermediateHiveTableSql());
+    }
+
+    public void extractDistinctColumn() {
+        // create distinct tables
+        spark.sql(getCreateDistinctKeyTableSql());
+
+        // extract distinct column
+        List<GlobalDictBuildWorker> workerList = new ArrayList<>();
+        // For the column in dictColumns's valueSet, their value is a subset of column in keyset,
+        // so we don't need to extract distinct value of column in valueSet
+        for (Object column : dictColumn.keySet()) {
+            workerList.add(()->{
+                spark.sql(getInsertDistinctKeyTableSql(column.toString(), dorisIntermediateHiveTable));
+            });
+        }
+
+        submitWorker(workerList);
+    }
+
+    public void buildGlobalDict() throws ExecutionException, InterruptedException {
+        // create global dict hive table
+        spark.sql(getCreateGlobalDictHiveTableSql());
+
+        List<GlobalDictBuildWorker> globalDictBuildWorkers = new ArrayList<>();
+        for (Object distinctColumnNameOrigin : dictColumn.keySet()) {
+            String distinctColumnNameTmp = distinctColumnNameOrigin.toString();
+            globalDictBuildWorkers.add(()->{
+                // get global dict max value
+                List<Row> maxGlobalDictValueRow = spark.sql(getMaxGlobalDictValueSql(distinctColumnNameTmp)).collectAsList();
+                if (maxGlobalDictValueRow.size() == 0) {
+                    throw new RuntimeException(String.format("get max dict value failed: %s", distinctColumnNameTmp));
+                }
+
+                long maxDictValue = 0;
+                long minDictValue = 0;
+                Row row = maxGlobalDictValueRow.get(0);
+                if (row != null && row.get(0) != null) {
+                    maxDictValue = (long)row.get(0);
+                    minDictValue = (long)row.get(1);
+                }
+                LOG.info(" column {} 's max value in dict is {} , min value is {}", distinctColumnNameTmp, maxDictValue, minDictValue);
+                // maybe never happened, but we need detect it
+                if (minDictValue < 0) {
+                    throw new RuntimeException(String.format(" column %s 's cardinality has exceed bigint's max value", distinctColumnNameTmp));
+                }
+
+                if (veryHighCardinalityColumn.contains(distinctColumnNameTmp) && veryHighCardinalityColumnSplitNum > 1) {
+                    // split distinct key first and then encode with count
+                    buildGlobalDictBySplit(maxDictValue, distinctColumnNameTmp);
+                } else {
+                    // build global dict directly
+                    spark.sql(getBuildGlobalDictSql(maxDictValue, distinctColumnNameTmp));
+                }
+
+            });
+        }
+        submitWorker(globalDictBuildWorkers);
+    }
+
+    // encode dorisIntermediateHiveTable's distinct column
+    public void encodeDorisIntermediateHiveTable() {
+        for (Object distinctColumnObj  : dictColumn.keySet()) {

Review comment:
       two spaces

##########
File path: fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java
##########
@@ -0,0 +1,413 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Column;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ *  used for build hive global dict and encode source hive table
+ *
+ *  input: a source hive table
+ *  output: a intermediate hive table whose distinct column is encode with int value
+ *
+ *  usage example
+ *  step1,create a intermediate hive table
+ *      BuildGlobalDict.createHiveIntermediateTable()
+ *  step2, get distinct column's value
+ *      BuildGlobalDict.extractDistinctColumn()
+ *  step3, build global dict
+ *      BuildGlobalDict.buildGlobalDict()
+ *  step4, encode intermediate hive table with global dict
+ *      BuildGlobalDict.encodeDorisIntermediateHiveTable()
+ */
+
+public class GlobalDictBuilder {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(GlobalDictBuilder.class);
+
+    // name of the column in doris table which need to build global dict
+    // for example: some dict columns a,b,c
+    // case 1: all dict columns has no relation, then the map is as below
+    //     [a=null, b=null, c=null]
+    // case 2: column a's value can reuse column b's value which means column a's value is a subset of column b's value
+    //  [b=a,c=null]
+    private MultiValueMap dictColumn;
+    // target doris table columns in current spark load job
+    private List<String> dorisOlapTableColumnList;
+
+    // distinct columns which need to use map join to solve data skew in encodeDorisIntermediateHiveTable()
+    // we needn't to specify it until data skew happends
+    private List<String> mapSideJoinColumns;
+
+    // hive table datasource,format is db.table
+    private String sourceHiveDBTableName;
+    // user-specified filter when query sourceHiveDBTable
+    private String sourceHiveFilter;
+    // intermediate hive table to store the distinct value of distinct column
+    private String distinctKeyTableName;
+    // current doris table's global dict hive table
+    private String globalDictTableName;
+
+    // used for next step to read
+    private String dorisIntermediateHiveTable;
+    private SparkSession spark;
+
+    // key=doris column name,value=column type
+    private Map<String, String> dorisColumnNameTypeMap = new HashMap<>();
+
+    // column in this list means need split distinct value and then encode respectively
+    // to avoid the performance bottleneck to transfer origin value to dict value
+    private List<String> veryHighCardinalityColumn;
+    // determine the split num of new distinct value,better can be divisible by 1
+    private int veryHighCardinalityColumnSplitNum;
+
+    private ExecutorService pool;
+
+    private StructType distinctValueSchema;
+
+    public GlobalDictBuilder(MultiValueMap dictColumn,
+                             List<String> dorisOlapTableColumnList,
+                             List<String> mapSideJoinColumns,
+                             String sourceHiveDBTableName,
+                             String sourceHiveFilter,
+                             String dorisHiveDB,
+                             String distinctKeyTableName,
+                             String globalDictTableName,
+                             String dorisIntermediateHiveTable,
+                             int buildConcurrency,
+                             List<String> veryHighCardinalityColumn,
+                             int veryHighCardinalityColumnSplitNum,
+                             SparkSession spark) {
+        this.dictColumn = dictColumn;
+        this.dorisOlapTableColumnList = dorisOlapTableColumnList;
+        this.mapSideJoinColumns = mapSideJoinColumns;
+        this.sourceHiveDBTableName = sourceHiveDBTableName;
+        this.sourceHiveFilter = sourceHiveFilter;
+        this.distinctKeyTableName = distinctKeyTableName;
+        this.globalDictTableName = globalDictTableName;
+        this.dorisIntermediateHiveTable = dorisIntermediateHiveTable;
+        this.spark = spark;
+        this.pool = Executors.newFixedThreadPool(buildConcurrency < 0 ? 1 : buildConcurrency);
+        this.veryHighCardinalityColumn = veryHighCardinalityColumn;
+        this.veryHighCardinalityColumnSplitNum = veryHighCardinalityColumnSplitNum;
+
+        spark.sql("use " + dorisHiveDB);
+    }
+
+    public void createHiveIntermediateTable() throws AnalysisException {
+        Map<String, String> sourceHiveTableColumn = spark.catalog()
+                .listColumns(sourceHiveDBTableName)
+                .collectAsList()
+                .stream().collect(Collectors.toMap(Column::name, Column::dataType));
+
+        Map<String, String> sourceHiveTableColumnInLowercase = new HashMap<>();
+        for (Map.Entry<String, String> entry : sourceHiveTableColumn.entrySet()) {
+            sourceHiveTableColumnInLowercase.put(entry.getKey().toLowerCase(), entry.getValue().toLowerCase());
+        }
+
+        // check and get doris column type in hive
+        dorisOlapTableColumnList.stream().forEach(columnName -> {
+            String columnType = sourceHiveTableColumnInLowercase.get(columnName);
+            if (StringUtils.isEmpty(columnType)) {
+                throw new RuntimeException(String.format("doris column %s not in source hive table", columnName));
+            }
+            dorisColumnNameTypeMap.put(columnName, columnType);
+        });
+
+        spark.sql(String.format("drop table if exists %s ", dorisIntermediateHiveTable));
+        // create IntermediateHiveTable
+        spark.sql(getCreateIntermediateHiveTableSql());
+
+        // insert data to IntermediateHiveTable
+        spark.sql(getInsertIntermediateHiveTableSql());
+    }
+
+    public void extractDistinctColumn() {
+        // create distinct tables
+        spark.sql(getCreateDistinctKeyTableSql());
+
+        // extract distinct column
+        List<GlobalDictBuildWorker> workerList = new ArrayList<>();
+        // For the column in dictColumns's valueSet, their value is a subset of column in keyset,
+        // so we don't need to extract distinct value of column in valueSet
+        for (Object column : dictColumn.keySet()) {
+            workerList.add(()->{
+                spark.sql(getInsertDistinctKeyTableSql(column.toString(), dorisIntermediateHiveTable));
+            });
+        }
+
+        submitWorker(workerList);
+    }
+
+    public void buildGlobalDict() throws ExecutionException, InterruptedException {
+        // create global dict hive table
+        spark.sql(getCreateGlobalDictHiveTableSql());
+
+        List<GlobalDictBuildWorker> globalDictBuildWorkers = new ArrayList<>();
+        for (Object distinctColumnNameOrigin : dictColumn.keySet()) {
+            String distinctColumnNameTmp = distinctColumnNameOrigin.toString();
+            globalDictBuildWorkers.add(()->{
+                // get global dict max value
+                List<Row> maxGlobalDictValueRow = spark.sql(getMaxGlobalDictValueSql(distinctColumnNameTmp)).collectAsList();
+                if (maxGlobalDictValueRow.size() == 0) {
+                    throw new RuntimeException(String.format("get max dict value failed: %s", distinctColumnNameTmp));
+                }
+
+                long maxDictValue = 0;
+                long minDictValue = 0;
+                Row row = maxGlobalDictValueRow.get(0);
+                if (row != null && row.get(0) != null) {
+                    maxDictValue = (long)row.get(0);
+                    minDictValue = (long)row.get(1);
+                }
+                LOG.info(" column {} 's max value in dict is {} , min value is {}", distinctColumnNameTmp, maxDictValue, minDictValue);
+                // maybe never happened, but we need detect it
+                if (minDictValue < 0) {
+                    throw new RuntimeException(String.format(" column %s 's cardinality has exceed bigint's max value", distinctColumnNameTmp));
+                }
+
+                if (veryHighCardinalityColumn.contains(distinctColumnNameTmp) && veryHighCardinalityColumnSplitNum > 1) {
+                    // split distinct key first and then encode with count
+                    buildGlobalDictBySplit(maxDictValue, distinctColumnNameTmp);
+                } else {
+                    // build global dict directly
+                    spark.sql(getBuildGlobalDictSql(maxDictValue, distinctColumnNameTmp));
+                }
+
+            });
+        }
+        submitWorker(globalDictBuildWorkers);
+    }
+
+    // encode dorisIntermediateHiveTable's distinct column
+    public void encodeDorisIntermediateHiveTable() {
+        for (Object distinctColumnObj  : dictColumn.keySet()) {
+            spark.sql(getEncodeDorisIntermediateHiveTableSql(distinctColumnObj.toString(), (ArrayList)dictColumn.get(distinctColumnObj.toString())));
+        }
+    }
+
+    private String getCreateIntermediateHiveTableSql() {
+        StringBuilder sql = new StringBuilder();
+        sql.append("create table if not exists " + dorisIntermediateHiveTable + " ( ");
+
+        Set<String> allDictColumn = new HashSet<>();
+        allDictColumn.addAll(dictColumn.keySet());
+        allDictColumn.addAll(dictColumn.values());
+        dorisOlapTableColumnList.stream().forEach(columnName -> {
+            sql.append(columnName).append(" ");
+            if (allDictColumn.contains(columnName)) {
+                sql.append(" string ,");
+            } else {
+                sql.append(dorisColumnNameTypeMap.get(columnName)).append(" ,");
+            }
+        });
+        return sql.deleteCharAt(sql.length() - 1).append(" )").append(" stored as sequencefile ").toString();
+    }
+
+    private String getInsertIntermediateHiveTableSql() {
+        StringBuilder sql = new StringBuilder();
+        sql.append("insert overwrite table ").append(dorisIntermediateHiveTable).append(" select ");
+        dorisOlapTableColumnList.stream().forEach(columnName -> {
+            sql.append(columnName).append(" ,");
+        });
+        sql.deleteCharAt(sql.length() - 1)
+                .append(" from ").append(sourceHiveDBTableName);
+        if (!StringUtils.isEmpty(sourceHiveFilter)) {
+            sql.append(" where ").append(sourceHiveFilter);
+        }
+        return sql.toString();
+    }
+
+    private String getCreateDistinctKeyTableSql() {
+        return "create table if not exists " + distinctKeyTableName + "(dict_key string) partitioned by (dict_column string) stored as sequencefile ";
+    }
+
+    private String getInsertDistinctKeyTableSql(String distinctColumnName, String sourceHiveTable) {
+        StringBuilder sql = new StringBuilder();
+        sql.append("insert overwrite table ").append(distinctKeyTableName)
+                .append(" partition(dict_column='").append(distinctColumnName).append("')")
+                .append(" select ").append(distinctColumnName)
+                .append(" from ").append(sourceHiveTable)
+                .append(" group by ").append(distinctColumnName);
+        return sql.toString();
+    }
+
+    private String getCreateGlobalDictHiveTableSql() {
+        return "create table if not exists " + globalDictTableName
+                + "(dict_key string, dict_value bigint) partitioned by(dict_column string) stored as sequencefile ";
+    }
+
+    private String getMaxGlobalDictValueSql(String distinctColumnName) {
+        return "select max(dict_value) as max_value,min(dict_value) as min_value from " + globalDictTableName + " where dict_column='" + distinctColumnName + "'";
+    }
+
+    private void buildGlobalDictBySplit(long maxGlobalDictValue, String distinctColumnName) {
+        // 1. get distinct value
+        Dataset<Row> newDistinctValue = spark.sql(getNewDistinctValue(distinctColumnName));
+
+        // 2. split the newDistinctValue to avoid window functions' single node bottleneck
+        Dataset<Row>[] splitedDistinctValue = newDistinctValue.randomSplit(getRandomSplitWeights());
+        long currentMaxDictValue = maxGlobalDictValue;
+        Map<String, Long> distinctKeyMap = new HashMap<>();
+
+        for (int i = 0; i < splitedDistinctValue.length; i++) {
+            long currentDatasetStartDictValue = currentMaxDictValue;
+            long splitDistinctValueCount = splitedDistinctValue[i].count();
+            currentMaxDictValue += splitDistinctValueCount;
+            String tmpDictTableName = String.format("%s_%s_tmp_dict_%s", i, currentDatasetStartDictValue, distinctColumnName);
+            distinctKeyMap.put(tmpDictTableName, currentDatasetStartDictValue);
+            Dataset<Row> distinctValueFrame = spark.createDataFrame(splitedDistinctValue[i].toJavaRDD(), getDistinctValueSchema());
+            distinctValueFrame.createOrReplaceTempView(tmpDictTableName);
+        }
+
+        spark.sql(getSplitBuildGlobalDictSql(distinctKeyMap, distinctColumnName));
+
+    }
+
+    private String getSplitBuildGlobalDictSql(Map<String, Long> distinctKeyMap, String distinctColumnName) {
+        StringBuilder sql = new StringBuilder();
+        sql.append("insert overwrite table ").append(globalDictTableName).append(" partition(dict_column='").append(distinctColumnName).append("') ")
+                .append(" select dict_key,dict_value from ").append(globalDictTableName).append(" where dict_column='").append(distinctColumnName).append("' ");
+        for (Map.Entry<String, Long> entry : distinctKeyMap.entrySet()) {
+            sql.append(" union all select dict_key, (row_number() over(order by dict_key)) ")
+                    .append(String.format(" +(%s) as dict_value from %s", entry.getValue(), entry.getKey()));
+        }
+        return sql.toString();
+    }
+
+    private StructType getDistinctValueSchema() {
+        if (distinctValueSchema == null) {
+            List<StructField> fieldList = new ArrayList<>();
+            fieldList.add(DataTypes.createStructField("dict_key", DataTypes.StringType, false));
+            distinctValueSchema = DataTypes.createStructType(fieldList);
+        }
+        return distinctValueSchema;
+    }
+
+    private double[] getRandomSplitWeights() {
+        double[] weights = new double[veryHighCardinalityColumnSplitNum];
+        double weight = 1 / Double.parseDouble(String.valueOf(veryHighCardinalityColumnSplitNum));
+        Arrays.fill(weights, weight);
+        return weights;
+    }
+
+    private String getBuildGlobalDictSql(long maxGlobalDictValue, String distinctColumnName) {
+        return "insert overwrite table " + globalDictTableName + " partition(dict_column='" + distinctColumnName + "') "
+                + " select dict_key,dict_value from " + globalDictTableName + " where dict_column='" + distinctColumnName + "' "
+                + " union all select t1.dict_key as dict_key,(row_number() over(order by t1.dict_key)) + (" + maxGlobalDictValue + ") as dict_value from "
+                + "(select dict_key from " + distinctKeyTableName + " where dict_column='" + distinctColumnName + "' and dict_key is not null)t1 left join "
+                + " (select dict_key,dict_value from " + globalDictTableName + " where dict_column='" + distinctColumnName + "' )t2 " +
+                "on t1.dict_key = t2.dict_key where t2.dict_value is null";
+    }
+
+    private String getNewDistinctValue(String distinctColumnName) {
+        return  "select t1.dict_key from " +
+                " (select dict_key from " + distinctKeyTableName + " where dict_column='" + distinctColumnName + "' and dict_key is not null)t1 left join " +
+                " (select dict_key,dict_value from " + globalDictTableName + " where dict_column='" + distinctColumnName + "' )t2 " +
+                "on t1.dict_key = t2.dict_key where t2.dict_value is null";
+
+    }
+
+    private String getEncodeDorisIntermediateHiveTableSql(String dictColumn, ArrayList<String> childColumn) {

Review comment:
       better using `List<String> childColumn`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo closed pull request #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict

Posted by GitBox <gi...@apache.org>.
wangbo closed pull request #3063:
URL: https://github.com/apache/incubator-doris/pull/3063


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict

Posted by GitBox <gi...@apache.org>.
morningman merged pull request #3063:
URL: https://github.com/apache/incubator-doris/pull/3063


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #3063:
URL: https://github.com/apache/incubator-doris/pull/3063#discussion_r439456426



##########
File path: fe/src/main/java/org/apache/doris/load/loadv2/dpp/GlobalDictBuilder.java
##########
@@ -0,0 +1,413 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalog.Column;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ *  used for build hive global dict and encode source hive table
+ *
+ *  input: a source hive table
+ *  output: a intermediate hive table whose distinct column is encode with int value
+ *
+ *  usage example
+ *  step1,create a intermediate hive table
+ *      BuildGlobalDict.createHiveIntermediateTable()

Review comment:
       ```suggestion
    *      GlobalDictBuilder.createHiveIntermediateTable()
   ```

##########
File path: fe/pom.xml
##########
@@ -38,7 +38,6 @@ under the License.
         <maven.compiler.target>1.8</maven.compiler.target>
         <jprotobuf.version>2.2.11</jprotobuf.version>
         <skip.plugin>false</skip.plugin>
-        <fe_ut_parallel>${env.FE_UT_PARALLEL}</fe_ut_parallel>

Review comment:
       Why removing this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo commented on pull request #3063: (#3061) [Spark Load] Doris Support Using Hive Table to Build Global Dict

Posted by GitBox <gi...@apache.org>.
wangbo commented on pull request #3063:
URL: https://github.com/apache/incubator-doris/pull/3063#issuecomment-642018644


   1 add dict resue to reduce shuffle times


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org