You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/11/05 01:43:44 UTC

[incubator-doris] branch master updated: Fix hadoop load failed when enable batch delete in unique table (#6996)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 995fa99  Fix hadoop load failed when enable batch delete in unique table (#6996)
995fa99 is described below

commit 995fa992f75eda2e1da429fd79c39704b9f1dc38
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Fri Nov 5 09:43:28 2021 +0800

    Fix hadoop load failed when enable batch delete in unique table (#6996)
---
 .../src/main/java/org/apache/doris/load/Load.java  | 23 ++++++++++++++++++++++
 .../apache/doris/task/HadoopLoadPendingTask.java   |  4 +---
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 9469e4c..4e0d4d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -83,6 +83,7 @@ import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.load.FailMsg.CancelType;
 import org.apache.doris.load.LoadJob.JobState;
+import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.ReplicaPersistInfo;
@@ -581,6 +582,11 @@ public class Load {
                 // set default timeout
                 job.setTimeoutSecond(Config.hadoop_load_default_timeout_second);
             }
+            for (DataDescription dataDescription : dataDescriptions) {
+                if (dataDescription.getMergeType() != LoadTask.MergeType.APPEND) {
+                    throw new DdlException("MERGE OR DELETE is not supported in hadoop load.");
+                }
+            }
         } else if (etlJobType == EtlJobType.BROKER) {
             if (job.getTimeoutSecond() == 0) {
                 // set default timeout
@@ -758,6 +764,23 @@ public class Load {
                         // do nothing
                     }
 
+                } else if (!column.isVisible()) {
+                    /*
+                     *  For batch delete table add hidden column __DORIS_DELETE_SIGN__ to columns
+                     * eg:
+                     * (A, B, C)
+                     * ->
+                     * (A, B, C) SET (__DORIS_DELETE_SIGN__ = 0)
+                     */
+                    columnToHadoopFunction.put(column.getName(), Pair.create("default_value", Lists.newArrayList(column.getDefaultValue())));
+                    ImportColumnDesc importColumnDesc = null;
+                    try {
+                        importColumnDesc = new ImportColumnDesc(column.getName(),
+                                new FunctionCallExpr("default_value", Arrays.asList(column.getDefaultValueExpr())));
+                    } catch (AnalysisException e) {
+                        throw new DdlException(e.getMessage());
+                    }
+                    parsedColumnExprList.add(importColumnDesc);
                 }
             }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java
index cffd772..e1944eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java
@@ -169,7 +169,7 @@ public class HadoopLoadPendingTask extends LoadPendingTask {
 
     private Map<String, EtlColumn> createEtlColumns(OlapTable table) {
         Map<String, EtlColumn> etlColumns = Maps.newHashMap();
-        for (Column column : table.getBaseSchema()) {
+        for (Column column : table.getBaseSchema(true)) {
             etlColumns.put(column.getName(), new EtlColumn(column));
         }
         return etlColumns;
@@ -225,7 +225,6 @@ public class HadoopLoadPendingTask extends LoadPendingTask {
                 }
                 columnRefs.add(dppColumn);
             }
-
             // distribution infos
             DistributionInfo distributionInfo = partition.getDistributionInfo();
             List<String> distributionColumnRefs = Lists.newArrayList();
@@ -266,7 +265,6 @@ public class HadoopLoadPendingTask extends LoadPendingTask {
                     LOG.warn("unknown distribution type. type: {}", distributionInfo.getType().name());
                     throw new LoadException("unknown distribution type. type: " + distributionInfo.getType().name());
             }
-
             etlIndex.setPidKeyCount(keySize);
             etlIndex.setColumnRefs(columnRefs);
             etlIndices.put(String.valueOf(indexId), etlIndex);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org