You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2020/12/30 02:28:08 UTC

[incubator-doris] branch master updated: Improve the processing logic of Load statement derived columns (#5140)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 62604df  Improve the processing logic of Load statement derived columns (#5140)
62604df is described below

commit 62604dfeac82f4c3e200079ed768aa0eda79d485
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed Dec 30 10:27:46 2020 +0800

    Improve the processing logic of Load statement derived columns (#5140)
    
    * support transitive in load expr
---
 .../administrator-guide/load-data/load-manual.md   |  5 ++
 docs/en/installing/compilation.md                  |  6 ++-
 .../administrator-guide/load-data/load-manual.md   |  6 +++
 docs/zh-CN/installing/compilation.md               |  4 +-
 env.sh                                             |  5 +-
 .../apache/doris/analysis/ImportColumnDesc.java    |  4 ++
 .../src/main/java/org/apache/doris/load/Load.java  | 59 +++++++++++++++++-----
 .../org/apache/doris/analysis/LoadStmtTest.java    | 33 ++++++++++++
 .../doris/planner/StreamLoadScanNodeTest.java      | 11 ----
 9 files changed, 106 insertions(+), 27 deletions(-)

diff --git a/docs/en/administrator-guide/load-data/load-manual.md b/docs/en/administrator-guide/load-data/load-manual.md
index de6bcc2..f5514e6 100644
--- a/docs/en/administrator-guide/load-data/load-manual.md
+++ b/docs/en/administrator-guide/load-data/load-manual.md
@@ -217,3 +217,8 @@ The following configuration belongs to the BE system configuration, which can be
 + label\_keep\_max\_second
 
   The retention time of load job which is FINISHED or CANCELLED. The record of load job will be kept in Doris system for a period of time which is determined by this parameter. The default time of this parameter is 3 days. This parameter is common to all types of load job. 
+
+### Column mapping
+Assuming that the imported data is `1, 2, 3` and the table has three columns of `c1, c2, c3`, if the data is directly imported into the table, you can use the following statement `COLUMNS(c1,c2,c3)` This statement is equivalent to `COLUMNS(tmp_c1,tmp_c2,tmp_c3,c1=tmp_c1,c2=tmp_c2,c3=tmp_c3)`
+If you want to perform transformation or use temporary variables when importing data, the transformation or temporary variables must be specified in the order of use, for example, `COLUMNS(tmp_c1,tmp_c2,tmp_c3, c1 = tmp_c1 +1, c2= c1+1, c3 = c2+1)`, this statement is equivalent to `COLUMNS(tmp_c1,tmp_c2,tmp_c3, c1 = tmp_c1 +1, c2 = tmp_c1 +1+1, c3 =tmp_c1 +1+1+1)`
+When using an expression, this expression must be defined in front. For example, the following statement is not legal `COLUMNS(tmp_c1,tmp_c2,tmp_c3, c1 = c1+1, c2 = temp + 1, temp = tmp_c1 +1, c3 =c2+1)`
\ No newline at end of file
diff --git a/docs/en/installing/compilation.md b/docs/en/installing/compilation.md
index 3cb7c5a..b4369c4 100644
--- a/docs/en/installing/compilation.md
+++ b/docs/en/installing/compilation.md
@@ -96,8 +96,12 @@ You can try to compile Doris directly in your own Linux environment.
     `GCC 5.3.1+, Oracle JDK 1.8+, Python 2.7+, Apache Maven 3.5+, CMake 3.11+`
 
     If you are using Ubuntu 16.04 or newer, you can use the following command to install the dependencies
+
+   `sudo apt-get install build-essential openjdk-8-jdk maven cmake byacc flex automake libtool-bin bison binutils-dev libiberty-dev zip unzip libncurses5-dev curl git ninja-build python`
     
-    `sudo apt-get install build-essential openjdk-8-jdk maven cmake byacc flex automake libtool-bin bison binutils-dev libiberty-dev zip unzip libncurses5-dev`
+    If you are using CentOS you can use the following command to install the dependencies
+   
+   `sudo yum groupinstall 'Development Tools' && sudo yum install maven cmake byacc flex automake libtool bison binutils-devel zip unzip ncurses-devel curl git wget python2 glibc-static libstdc++-static`
 
     After installation, set environment variables `PATH`, `JAVA_HOME`, etc.
 
diff --git a/docs/zh-CN/administrator-guide/load-data/load-manual.md b/docs/zh-CN/administrator-guide/load-data/load-manual.md
index 1584d25..eebc167 100644
--- a/docs/zh-CN/administrator-guide/load-data/load-manual.md
+++ b/docs/zh-CN/administrator-guide/load-data/load-manual.md
@@ -215,3 +215,9 @@ Doris 目前的导入方式分为两类,同步和异步。如果是外部程
 + label\_keep\_max\_second
   
     设置导入任务记录保留时间。已经完成的( FINISHED or CANCELLED )导入任务记录会保留在 Doris 系统中一段时间,时间由此参数决定。参数默认值时间为3天。该参数通用与所有类型的导入任务。
+
+### 列映射
+  假设导入数据有为 `1,2,3`,表有 `c1,c2,c3` 三列,如果数据直接导入表中可以使用如下语句 `COLUMNS(c1,c2,c3)` 此语句等价于 `COLUMNS(tmp_c1,tmp_c2,tmp_c3,c1=tmp_c1,c2=tmp_c2,c3=tmp_c3)`
+如果想再导入数据时执行变换或者使用临时变量,则变换或者临时变量一定要按照使用的顺序指定, 例如 `COLUMNS(tmp_c1,tmp_c2,tmp_c3, c1 = tmp_c1 +1, c2= c1+1, c3 =c2+1)`, 这样的语句等价于 `COLUMNS(tmp_c1,tmp_c2,tmp_c3, c1 = tmp_c1 +1, c2= tmp_c1 +1+1, c3 =tmp_c1 +1+1+1)`
+在使用某个表达式时这个表达式一定要在前面定义,例如如下语句则不合法 `COLUMNS(tmp_c1,tmp_c2,tmp_c3, c1 = c1+1, c2 = temp + 1, temp = tmp_c1 +1, c3 =c2+1)`
+
diff --git a/docs/zh-CN/installing/compilation.md b/docs/zh-CN/installing/compilation.md
index bb66839..6a45e85 100644
--- a/docs/zh-CN/installing/compilation.md
+++ b/docs/zh-CN/installing/compilation.md
@@ -97,7 +97,9 @@ under the License.
 
     如果使用Ubuntu 16.04 及以上系统 可以执行以下命令来安装依赖
     
-    `sudo apt-get install build-essential openjdk-8-jdk maven cmake byacc flex automake libtool-bin bison binutils-dev libiberty-dev zip unzip libncurses5-dev`
+    `sudo apt-get install build-essential openjdk-8-jdk maven cmake byacc flex automake libtool-bin bison binutils-dev libiberty-dev zip unzip libncurses5-dev curl git ninja-build python`
+    如果是CentOS 可以执行以下命令
+    `sudo yum groupinstall 'Development Tools' && sudo yum install maven cmake byacc flex automake libtool bison binutils-devel zip unzip ncurses-devel curl git wget python2 glibc-static libstdc++-static`
 
     安装完成后,自行设置环境变量 `PATH`, `JAVA_HOME` 等。
     
diff --git a/env.sh b/env.sh
index 6c51bd2..7cf7c0d 100755
--- a/env.sh
+++ b/env.sh
@@ -73,8 +73,9 @@ fi
 
 # check java version
 export JAVA=${JAVA_HOME}/bin/java
-JAVA_VER=$(${JAVA} -version 2>&1 | sed 's/.* version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q' | cut -f1 -d " ")
-if [[ $JAVA_VER -lt 18 ]]; then
+JAVAP=${JAVA_HOME}/bin/javap
+JAVA_VER=$(${JAVAP} -verbose java.lang.String | grep "major version" | cut -d " " -f5)
+if [[ $JAVA_VER -lt 52 ]]; then
     echo "Error: require JAVA with JDK version at least 1.8"
     exit 1
 fi
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java
index 85c8e18..898f6e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java
@@ -51,6 +51,10 @@ public class ImportColumnDesc {
         return expr;
     }
 
+    public void setExpr(Expr expr) {
+        this.expr = expr;
+    }
+
     public boolean isColumn() {
         return expr == null;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 2e491b4..7ec2e14 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -107,6 +107,7 @@ import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TransactionStatus;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -115,6 +116,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -941,6 +943,7 @@ public class Load {
                                    Map<String, Pair<String, List<String>>> columnToHadoopFunction,
                                    Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc,
                                    Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params) throws UserException {
+        rewriteColumns(columnExprs);
         initColumns(tbl, columnExprs, columnToHadoopFunction, exprsByName, analyzer,
                     srcTupleDesc, slotDescByName, params, true);
     }
@@ -958,22 +961,16 @@ public class Load {
             Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc,
             Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params,
             boolean needInitSlotAndAnalyzeExprs) throws UserException {
-        // check mapping column exist in schema
-        // !! all column mappings are in columnExprs !!
+        // We make a copy of the columnExprs so that our subsequent changes
+        // to the columnExprs will not affect the original columnExprs.
+        // skip the mapping columns not exist in schema
+        List<ImportColumnDesc> copiedColumnExprs = new ArrayList<>();
         for (ImportColumnDesc importColumnDesc : columnExprs) {
-            if (importColumnDesc.isColumn()) {
-                continue;
-            }
-
             String mappingColumnName = importColumnDesc.getColumnName();
-            if (tbl.getColumn(mappingColumnName) == null) {
-                throw new DdlException("Mapping column is not in table. column: " + mappingColumnName);
+            if (importColumnDesc.isColumn() || tbl.getColumn(mappingColumnName) != null) {
+                copiedColumnExprs.add(importColumnDesc);
             }
         }
-
-        // We make a copy of the columnExprs so that our subsequent changes
-        // to the columnExprs will not affect the original columnExprs.
-        List<ImportColumnDesc> copiedColumnExprs = Lists.newArrayList(columnExprs);
         // check whether the OlapTable has sequenceCol
         boolean hasSequenceCol = false;
         if (tbl instanceof OlapTable && ((OlapTable)tbl).hasSequenceCol()) {
@@ -1133,6 +1130,44 @@ public class Load {
         LOG.debug("after init column, exprMap: {}", exprsByName);
     }
 
+    public static void rewriteColumns(List<ImportColumnDesc> columnExprs) {
+        Map<String, Expr> derivativeColumns = new HashMap<>();
+        // find and rewrite the derivative columns
+        // e.g. (v1,v2=v1+1,v3=v2+1) --> (v1, v2=v1+1, v3=v1+1+1)
+        // 1. find the derivative columns
+        for (ImportColumnDesc importColumnDesc : columnExprs) {
+            if (!importColumnDesc.isColumn()) {
+                if (importColumnDesc.getExpr() instanceof SlotRef) {
+                    String columnName = ((SlotRef) importColumnDesc.getExpr()).getColumnName();
+                    if (derivativeColumns.containsKey(columnName)) {
+                        importColumnDesc.setExpr(derivativeColumns.get(columnName));
+                    }
+                } else {
+                    recursiveRewrite(importColumnDesc.getExpr(), derivativeColumns);
+                }
+                derivativeColumns.put(importColumnDesc.getColumnName(), importColumnDesc.getExpr());
+            }
+        }
+
+    }
+
+    private static void recursiveRewrite(Expr expr, Map<String, Expr> derivativeColumns) {
+        if (CollectionUtils.isEmpty(expr.getChildren())) {
+            return;
+        }
+        for (int i = 0; i < expr.getChildren().size(); i++) {
+            Expr e = expr.getChild(i);
+            if (e instanceof SlotRef) {
+                String columnName = ((SlotRef) e).getColumnName();
+                if (derivativeColumns.containsKey(columnName)) {
+                    expr.setChild(i, derivativeColumns.get(columnName));
+                }
+            } else {
+                recursiveRewrite(e, derivativeColumns);
+            }
+        }
+    }
+
     /**
      * This method is used to transform hadoop function.
      * The hadoop function includes: replace_value, strftime, time_format, alignment_timestamp, default_value, now.
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
index a8500ab..ba83147 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
@@ -23,7 +23,9 @@ import org.apache.doris.catalog.SparkResource;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.Load;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.mysql.privilege.PaloAuth;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -35,7 +37,11 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import mockit.Expectations;
 import mockit.Injectable;
@@ -134,4 +140,31 @@ public class LoadStmtTest {
 
         Assert.fail("No exception throws.");
     }
+
+    @Test
+    public void testRewrite() throws Exception{
+        List<ImportColumnDesc> columns1 = getColumns("c1,c2,c3,tmp_c4=c1 + 1, tmp_c5 = tmp_c4+1");
+        Load.rewriteColumns(columns1);
+        String orig = "`c1` + 1 + 1";
+        Assert.assertEquals(orig, columns1.get(4).getExpr().toString());
+
+        List<ImportColumnDesc> columns2 = getColumns("c1,c2,c3,tmp_c5 = tmp_c4+1, tmp_c4=c1 + 1");
+        String orig2 = "`tmp_c4` + 1";
+        Load.rewriteColumns(columns2);
+        Assert.assertEquals(orig2, columns2.get(3).getExpr().toString());
+
+        List<ImportColumnDesc> columns3 = getColumns("c1,c2,c3");
+        String orig3 = "c3";
+        Load.rewriteColumns(columns3);
+        Assert.assertEquals(orig3, columns3.get(2).toString());
+
+    }
+
+    private List<ImportColumnDesc> getColumns(String columns) throws Exception {
+        String columnsSQL = "COLUMNS (" + columns + ")";
+        return ((ImportColumnsStmt) SqlParserUtils.getFirstStmt(
+            new SqlParser(
+                new SqlScanner(
+                    new StringReader(columnsSQL))))).getColumns();
+    }
 }
\ No newline at end of file
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
index 18d5523..4c4a8c7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
@@ -674,17 +674,6 @@ public class StreamLoadScanNodeTest {
         scanNode.toThrift(planNode);
     }
 
-    @Test(expected = DdlException.class)
-    public void testLoadInitColumnsMappingColumnNotExist() throws UserException {
-        List<Column> columns = Lists.newArrayList();
-        columns.add(new Column("c1", Type.INT, true, null, false, null, ""));
-        columns.add(new Column("c2", ScalarType.createVarchar(10), true, null, false, null, ""));
-        Table table = new Table(1L, "table0", TableType.OLAP, columns);
-        List<ImportColumnDesc> columnExprs = Lists.newArrayList();
-        columnExprs.add(new ImportColumnDesc("c3", new FunctionCallExpr("func", Lists.newArrayList())));
-        Load.initColumns(table, columnExprs, null, null, null, null, null, null);
-    }
-
     @Test
     public void testSequenceColumnWithSetColumns() throws UserException {
         Analyzer analyzer = new Analyzer(catalog, connectContext);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org