You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/01/12 02:55:03 UTC
[incubator-doris] branch master updated: Fix daily test failed
cause by delete condition (#5211)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 851d452 Fix daily test failed cause by delete condition (#5211)
851d452 is described below
commit 851d45216e7bef66f467b426e520327df0ec48e1
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Tue Jan 12 10:54:44 2021 +0800
Fix daily test failed cause by delete condition (#5211)
* fix daily test failed
---
be/CMakeLists.txt | 2 +-
conf/fe.conf | 2 +-
.../org/apache/doris/analysis/DataDescription.java | 10 ----------
.../doris/load/routineload/RoutineLoadJob.java | 11 +----------
.../apache/doris/planner/StreamLoadScanNode.java | 22 ++++++++++++++++++----
.../java/org/apache/doris/task/LoadTaskInfo.java | 1 +
.../java/org/apache/doris/task/StreamLoadTask.java | 15 +++++----------
7 files changed, 27 insertions(+), 36 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 9bea59f..af30e50 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -66,8 +66,8 @@ option(WITH_MYSQL "Support access MySQL" ON)
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS "7.3.0")
message(FATAL_ERROR "Need GCC version at least 7.3.0")
- message(STATUS "GCC version is greater than 7.3.0, disable -Werror. Be careful with compile warnings.")
endif()
+ message(STATUS "GCC version is greater than 7.3.0, disable -Werror. Be careful with compile warnings.")
elseif (NOT APPLE)
message(FATAL_ERROR "Compiler should be GNU")
endif()
diff --git a/conf/fe.conf b/conf/fe.conf
index 7d59cd7..9298619 100644
--- a/conf/fe.conf
+++ b/conf/fe.conf
@@ -37,7 +37,7 @@ JAVA_OPTS_FOR_JDK_9="-Xmx4096m -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -X
# INFO, WARN, ERROR, FATAL
sys_log_level = INFO
-# store metadata, create it if it is not exist.
+# store metadata, must be created before start FE.
# Default value is ${DORIS_HOME}/doris-meta
# meta_dir = ${DORIS_HOME}/doris-meta
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 577aa99..ccb2217 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -796,16 +796,6 @@ public class DataDescription {
analyzeColumns();
analyzeMultiLoadColumns();
analyzeSequenceCol(fullDbName);
- if (mergeType == LoadTask.MergeType.MERGE) {
- parsedColumnExprList.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
- } else if (mergeType == LoadTask.MergeType.DELETE) {
- parsedColumnExprList.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
- }
- // add columnExpr for sequence column
- if (hasSequenceCol()) {
- parsedColumnExprList.add(new ImportColumnDesc(Column.SEQUENCE_COL,
- new SlotRef(null, getSequenceCol())));
- }
}
/*
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 3824df9..80d4954 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -23,14 +23,11 @@ import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.ImportColumnsStmt;
-import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.PartitionNames;
-import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.catalog.Catalog;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
@@ -367,15 +364,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
deleteCondition = routineLoadDesc.getDeleteCondition();
}
mergeType = routineLoadDesc.getMergeType();
- if (mergeType == LoadTask.MergeType.MERGE) {
- columnDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
- } else if (mergeType == LoadTask.MergeType.DELETE) {
- columnDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
- }
if (routineLoadDesc.hasSequenceCol()) {
sequenceCol = routineLoadDesc.getSequenceColName();
- // add expr for sequence column
- columnDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, sequenceCol)));
}
}
}
@@ -600,6 +590,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return value;
}
+ @Override
public String getSequenceCol() {
return sequenceCol;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
index b31578b..c79dcf6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
@@ -19,11 +19,16 @@ package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ImportColumnDesc;
+import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.UserException;
import org.apache.doris.load.Load;
+import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TBrokerScanRange;
@@ -81,6 +86,9 @@ public class StreamLoadScanNode extends LoadScanNode {
this.analyzer = analyzer;
brokerScanRange = new TBrokerScanRange();
+
+ deleteCondition = taskInfo.getDeleteCondition();
+ mergeType = taskInfo.getMergeType();
TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc();
rangeDesc.file_type = taskInfo.getFileType();
@@ -115,16 +123,22 @@ public class StreamLoadScanNode extends LoadScanNode {
srcTupleDesc = analyzer.getDescTbl().createTupleDescriptor("StreamLoadScanNode");
TBrokerScanRangeParams params = new TBrokerScanRangeParams();
+ List<ImportColumnDesc> columnExprDescs = taskInfo.getColumnExprDescs();
+ if (mergeType == LoadTask.MergeType.MERGE) {
+ columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
+ } else if (mergeType == LoadTask.MergeType.DELETE) {
+ columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
+ }
+ if (taskInfo.hasSequenceCol()) {
+ columnExprDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, taskInfo.getSequenceCol())));
+ }
- Load.initColumns(dstTable, taskInfo.getColumnExprDescs(), null /* no hadoop function */,
+ Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */,
exprsByName, analyzer, srcTupleDesc, slotDescByName, params);
// analyze where statement
initWhereExpr(taskInfo.getWhereExpr(), analyzer);
- deleteCondition = taskInfo.getDeleteCondition();
- mergeType = taskInfo.getMergeType();
-
computeStats(analyzer);
createDefaultSmap(analyzer);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index ec335e1..7b1cf78 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -37,6 +37,7 @@ public interface LoadTaskInfo {
public LoadTask.MergeType getMergeType();
public Expr getDeleteCondition();
public boolean hasSequenceCol();
+ public String getSequenceCol();
public TFileType getFileType();
public TFileFormatType getFormatType();
public String getJsonPaths();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 7ccdd46..626d92d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -25,12 +25,9 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.ImportColumnsStmt;
import org.apache.doris.analysis.ImportWhereStmt;
-import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.PartitionNames;
-import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@@ -194,6 +191,11 @@ public class StreamLoadTask implements LoadTaskInfo {
return !Strings.isNullOrEmpty(sequenceCol);
}
+ @Override
+ public String getSequenceCol() {
+ return sequenceCol;
+ }
+
public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request, Database db) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType());
@@ -265,15 +267,8 @@ public class StreamLoadTask implements LoadTaskInfo {
if (negative && mergeType != LoadTask.MergeType.APPEND) {
throw new AnalysisException("Negative is only used when merge type is APPEND.");
}
- if (mergeType == LoadTask.MergeType.MERGE) {
- columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
- } else if (mergeType == LoadTask.MergeType.DELETE) {
- columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
- }
if (request.isSetSequenceCol()) {
sequenceCol = request.getSequenceCol();
- // add expr for sequence column
- columnExprDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, sequenceCol)));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org