You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/01/12 02:55:03 UTC

[incubator-doris] branch master updated: Fix daily test failed cause by delete condition (#5211)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 851d452  Fix daily test failed cause by delete condition (#5211)
851d452 is described below

commit 851d45216e7bef66f467b426e520327df0ec48e1
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Tue Jan 12 10:54:44 2021 +0800

    Fix daily test failed cause by delete condition (#5211)
    
    * fix daily test failed
---
 be/CMakeLists.txt                                  |  2 +-
 conf/fe.conf                                       |  2 +-
 .../org/apache/doris/analysis/DataDescription.java | 10 ----------
 .../doris/load/routineload/RoutineLoadJob.java     | 11 +----------
 .../apache/doris/planner/StreamLoadScanNode.java   | 22 ++++++++++++++++++----
 .../java/org/apache/doris/task/LoadTaskInfo.java   |  1 +
 .../java/org/apache/doris/task/StreamLoadTask.java | 15 +++++----------
 7 files changed, 27 insertions(+), 36 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 9bea59f..af30e50 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -66,8 +66,8 @@ option(WITH_MYSQL "Support access MySQL" ON)
 if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
     if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS "7.3.0")
         message(FATAL_ERROR "Need GCC version at least 7.3.0")
-        message(STATUS "GCC version is greater than 7.3.0, disable -Werror. Be careful with compile warnings.")
     endif()
+    message(STATUS "GCC version is greater than 7.3.0, disable -Werror. Be careful with compile warnings.")
 elseif (NOT APPLE)
     message(FATAL_ERROR "Compiler should be GNU")
 endif()
diff --git a/conf/fe.conf b/conf/fe.conf
index 7d59cd7..9298619 100644
--- a/conf/fe.conf
+++ b/conf/fe.conf
@@ -37,7 +37,7 @@ JAVA_OPTS_FOR_JDK_9="-Xmx4096m -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -X
 # INFO, WARN, ERROR, FATAL
 sys_log_level = INFO
 
-# store metadata, create it if it is not exist.
+# store metadata, must be created before start FE.
 # Default value is ${DORIS_HOME}/doris-meta
 # meta_dir = ${DORIS_HOME}/doris-meta
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 577aa99..ccb2217 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -796,16 +796,6 @@ public class DataDescription {
         analyzeColumns();
         analyzeMultiLoadColumns();
         analyzeSequenceCol(fullDbName);
-        if (mergeType == LoadTask.MergeType.MERGE) {
-            parsedColumnExprList.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
-        } else if (mergeType == LoadTask.MergeType.DELETE) {
-            parsedColumnExprList.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
-        }
-        // add columnExpr for sequence column
-        if (hasSequenceCol()) {
-            parsedColumnExprList.add(new ImportColumnDesc(Column.SEQUENCE_COL,
-                    new SlotRef(null, getSequenceCol())));
-        }
     }
 
     /*
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 3824df9..80d4954 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -23,14 +23,11 @@ import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.ImportColumnsStmt;
-import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.analysis.PartitionNames;
-import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
 import org.apache.doris.catalog.Catalog;
-import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
@@ -367,15 +364,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
                 deleteCondition = routineLoadDesc.getDeleteCondition();
             }
             mergeType = routineLoadDesc.getMergeType();
-            if (mergeType == LoadTask.MergeType.MERGE) {
-                columnDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
-            } else if (mergeType == LoadTask.MergeType.DELETE) {
-                columnDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
-            }
             if (routineLoadDesc.hasSequenceCol()) {
                 sequenceCol = routineLoadDesc.getSequenceColName();
-                // add expr for sequence column
-                columnDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, sequenceCol)));
             }
         }
     }
@@ -600,6 +590,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
         return value;
     }
 
+    @Override
     public String getSequenceCol() {
         return sequenceCol;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
index b31578b..c79dcf6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
@@ -19,11 +19,16 @@ package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ImportColumnDesc;
+import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.UserException;
 import org.apache.doris.load.Load;
+import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.thrift.TBrokerRangeDesc;
 import org.apache.doris.thrift.TBrokerScanRange;
@@ -81,6 +86,9 @@ public class StreamLoadScanNode extends LoadScanNode {
 
         this.analyzer = analyzer;
         brokerScanRange = new TBrokerScanRange();
+        
+        deleteCondition = taskInfo.getDeleteCondition();
+        mergeType = taskInfo.getMergeType();
 
         TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc();
         rangeDesc.file_type = taskInfo.getFileType();
@@ -115,16 +123,22 @@ public class StreamLoadScanNode extends LoadScanNode {
         srcTupleDesc = analyzer.getDescTbl().createTupleDescriptor("StreamLoadScanNode");
 
         TBrokerScanRangeParams params = new TBrokerScanRangeParams();
+        List<ImportColumnDesc> columnExprDescs = taskInfo.getColumnExprDescs();
+        if (mergeType == LoadTask.MergeType.MERGE) {
+            columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
+        }  else if (mergeType == LoadTask.MergeType.DELETE) {
+            columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
+        }
+        if (taskInfo.hasSequenceCol()) {
+            columnExprDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, taskInfo.getSequenceCol())));
+        }
 
-        Load.initColumns(dstTable, taskInfo.getColumnExprDescs(), null /* no hadoop function */,
+        Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */,
                 exprsByName, analyzer, srcTupleDesc, slotDescByName, params);
 
         // analyze where statement
         initWhereExpr(taskInfo.getWhereExpr(), analyzer);
 
-        deleteCondition = taskInfo.getDeleteCondition();
-        mergeType = taskInfo.getMergeType();
-
         computeStats(analyzer);
         createDefaultSmap(analyzer);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index ec335e1..7b1cf78 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -37,6 +37,7 @@ public interface LoadTaskInfo {
     public LoadTask.MergeType getMergeType();
     public Expr getDeleteCondition();
     public boolean hasSequenceCol();
+    public String getSequenceCol();
     public TFileType getFileType();
     public TFileFormatType getFormatType();
     public String getJsonPaths();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 7ccdd46..626d92d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -25,12 +25,9 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.ImportColumnsStmt;
 import org.apache.doris.analysis.ImportWhereStmt;
-import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.PartitionNames;
-import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
-import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
@@ -194,6 +191,11 @@ public class StreamLoadTask implements LoadTaskInfo {
         return !Strings.isNullOrEmpty(sequenceCol);
     }
 
+    @Override
+    public String getSequenceCol() {
+        return sequenceCol;
+    }
+
     public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request, Database db) throws UserException {
         StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
                                                            request.getFileType(), request.getFormatType());
@@ -265,15 +267,8 @@ public class StreamLoadTask implements LoadTaskInfo {
         if (negative && mergeType != LoadTask.MergeType.APPEND) {
             throw new AnalysisException("Negative is only used when merge type is APPEND.");
         }
-        if (mergeType == LoadTask.MergeType.MERGE) {
-            columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
-        }  else if (mergeType == LoadTask.MergeType.DELETE) {
-            columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
-        }
         if (request.isSetSequenceCol()) {
             sequenceCol = request.getSequenceCol();
-            // add expr for sequence column
-            columnExprDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, sequenceCol)));
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org