You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/09/17 03:32:45 UTC

[incubator-doris] branch master updated: Add where expr in broker load (#1812)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 054a3f4  Add where expr in broker load (#1812)
054a3f4 is described below

commit 054a3f48bcbe280dd39c2154cd50017d8a90ff88
Author: EmmyMiao87 <52...@qq.com>
AuthorDate: Tue Sep 17 11:32:40 2019 +0800

    Add where expr in broker load (#1812)
    
    The where predicate in broker load is responsible for filtering transformed data.
    The docs of help and operator has been changed.
---
 be/src/runtime/fragment_mgr.cpp                    |   3 +
 .../load-data/broker-load-manual.md                |  20 +-
 .../Data Manipulation/BROKER LOAD.md               |  14 +-
 .../load-data/broker-load-manual_EN.md             |   9 +-
 .../Data Manipulation/broker_load_EN.md            | 688 ++++++++++-----------
 fe/src/main/cup/sql_parser.cup                     |  10 +-
 .../org/apache/doris/analysis/DataDescription.java |  11 +-
 .../org/apache/doris/load/BrokerFileGroup.java     |   7 +
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   8 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |  17 +-
 .../org/apache/doris/load/loadv2/LoadManager.java  |   4 +-
 .../org/apache/doris/planner/BrokerScanNode.java   |  11 +-
 .../org/apache/doris/planner/LoadScanNode.java     |  75 +++
 .../apache/doris/planner/StreamLoadScanNode.java   |  35 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |  13 +-
 15 files changed, 503 insertions(+), 422 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7d6ac63..2049e70 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -288,11 +288,14 @@ void FragmentExecState::coordinator_callback(
             // TODO(zc)
             static std::string s_dpp_normal_all = "dpp.norm.ALL";
             static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
+            static std::string s_unselected_rows = "unselected.rows";
     
             params.load_counters.emplace(
                 s_dpp_normal_all, std::to_string(runtime_state->num_rows_load_success()));
             params.load_counters.emplace(
                 s_dpp_abnormal_all, std::to_string(runtime_state->num_rows_load_filtered()));
+            params.load_counters.emplace(
+                s_unselected_rows, std::to_string(runtime_state->num_rows_load_unselected()));
         }
         if (!runtime_state->get_error_log_file_path().empty()) {
             params.__set_tracking_url(
diff --git a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md
index 194357f..a50c8c6 100644
--- a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md
+++ b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md
@@ -78,6 +78,7 @@ WITH BROKER broker_name broker_properties
     [COLUMNS TERMINATED BY separator ]
     [(col1, ...)]
     [SET (k1=f1(xx), k2=f2(xx))]
+    [WHERE predicate]
 
 * broker_properties: 
 
@@ -101,6 +102,7 @@ LOAD LABEL db1.label1
     INTO TABLE tbl2
     COLUMNS TERMINATED BY ","
     (col1, col2)
+    where col1 > 1
 )
 WITH BROKER 'broker'
 (
@@ -146,6 +148,10 @@ Label 的另一个作用,是防止用户重复导入相同的数据。**强烈
 
     在 ```data_desc``` 中的 SET 语句负责设置列函数变换,这里的列函数变换支持所有查询的等值表达式变换。如果原始数据的列和表中的列不一一对应,就需要用到这个属性。
 
++ where predicate
+
+    在 ```data_desc``` 中的 WHERE 语句中负责过滤已经完成 transform 的数据,被 filter 的数据不会进入容忍率的统计中。如果多个 data_desc 中声明了同一张表的多个条件的话,则会 merge 同一张表的多个条件,merge 策略是 AND 。
+
 #### 导入作业参数
 
 导入作业参数主要指的是 Broker load 创建导入语句中的属于 ```opt_properties```部分的参数。导入作业参数是作用于整个导入作业的。
@@ -246,7 +252,7 @@ mysql> show load order by createtime desc limit 1\G
          State: FINISHED
       Progress: ETL:N/A; LOAD:100%
           Type: BROKER
-       EtlInfo: dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
+       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
       TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
       ErrorMsg: N/A
     CreateTime: 2019-07-27 11:46:42
@@ -255,7 +261,7 @@ mysql> show load order by createtime desc limit 1\G
  LoadStartTime: 2019-07-27 11:46:44
 LoadFinishTime: 2019-07-27 11:50:16
            URL: http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
-    LoadedRows: 82393000
+    JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
 ```
 
 下面主要介绍了查看导入命令返回结果集中参数意义:
@@ -292,7 +298,9 @@ LoadFinishTime: 2019-07-27 11:50:16
 
 + EtlInfo
 
-    主要显示了导入的数据量指标 ```dpp.norm.ALL 和 dpp.abnorm.ALL```。用户可以根据这两个指标验证当前导入任务的错误率是否超过 max\_filter\_ratio。
+    主要显示了导入的数据量指标 ```unselected.rows``` , ```dpp.norm.ALL 和 dpp.abnorm.ALL```。用户可以根据第一个数值判断 where 条件过滤了多少行,后两个指标验证当前导入任务的错误率是否超过 max\_filter\_ratio。
+
+    三个指标之和就是原始数据量的总行数。
     
 + TaskInfo
 
@@ -332,11 +340,11 @@ LoadFinishTime: 2019-07-27 11:50:16
 
 + JobDetails
 
-    显示一些作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的行数等。
+    显示一些作业的详细运行状态。包括导入文件的个数、总大小(字节)、子任务个数、已处理的原始行数等。
 
-    ```{"LoadedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}```
+    ```{"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}```
 
-    其中已处理的行数,每 5 秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以 EtlInfo 中显示的为准。
+    其中已处理的原始行数,每 5 秒更新一次。该行数仅用于展示当前的进度,不代表最终实际的处理行数。实际处理行数以 EtlInfo 中显示的为准。
 
 ### 取消导入
 
diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
index aca925d..e9210a3 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md	
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md	
@@ -41,7 +41,8 @@
             [FORMAT AS "file_type"]
             [(column_list)]
             [SET (k1 = func(k2))]
-    
+            [WHERE predicate]    
+
         说明:
             file_path: 
 
@@ -80,6 +81,9 @@
             例2: 表中有3个列“year, month, day"三个列,源文件中只有一个时间列,为”2018-06-01 01:02:03“格式。
             那么可以指定 columns(tmp_time) set (year = year(tmp_time), month=month(tmp_time), day=day(tmp_time)) 完成导入。
 
+            WHERE:
+          
+            对做完 transform 的数据进行过滤,符合 where 条件的数据才能被导入。WHERE 语句中只可引用表中列名。
     3. broker_name
 
         所使用的 broker 名称,可以通过 show broker 命令查看。
@@ -321,6 +325,14 @@
         (k1, k2, k3)
         )
         WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+     9. 对待导入数据进行过滤,k1 值大于 k2 值的列才能被导入
+        LOAD LABEL example_db.label10
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        where k1 > k2
+        )
      
 ## keyword
     BROKER,LOAD
diff --git a/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md b/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md
index 5149ca6..eb164df 100644
--- a/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md
+++ b/docs/documentation/en/administrator-guide/load-data/broker-load-manual_EN.md
@@ -78,6 +78,7 @@ WITH BROKER broker_name broker_properties
     [COLUMNS TERMINATED BY separator ]
     [(col1, ...)]
     [SET (k1=f1(xx), k2=f2(xx))]
+    [WHERE predicate]
 
 * broker_properties: 
 
@@ -101,6 +102,7 @@ LOAD LABEL db1.label1
     INTO TABLE tbl2
     COLUMNS TERMINATED BY ","
     (col1, col2)
+    where col1 > 1
 )
 WITH BROKER 'broker'
 (
@@ -142,6 +144,10 @@ The following is a detailed explanation of some parameters of the data descripti
 
 	In `data_desc`, you can specify the partition information of the table to be imported, but it will not be imported if the data to be imported does not belong to the specified partition. At the same time, data that does not specify a Partition is considered error data.
 
++ where predicate
+
+        The where statement in ```data_desc``` is responsible for filtering the data that has been transformed. The unselected rows which is filtered by where predicate will not be calculated in ```max_filter_ratio``` . If there are more then one where predicate of the same table , the multi where predicate will be merged from different ```data_desc``` and the policy is AND. 
+
 #### Import job parameters
 
 Import job parameters mainly refer to the parameters in Broker load creating import statement that belong to ``opt_properties``. Import operation parameters act on the whole import operation.
@@ -247,6 +253,7 @@ mysql> show load order by createtime desc limit 1\G
  LoadStartTime: 2019-07-27 11:46:44
 LoadFinishTime: 2019-07-27 11:50:16
            URL: http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
+    JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
 ```
 
 The following is mainly about the significance of viewing the parameters in the return result set of the import command:
@@ -282,7 +289,7 @@ The following is mainly about the significance of viewing the parameters in the
 	Types of import tasks. The type value of Broker load is only BROKER.
 + Etlinfo
 
-	It mainly shows the imported data quantity indicators `dpp.norm.ALL` and `dpp.abnorm.ALL`. Users can verify that the error rate of the current import task exceeds max\_filter\_ratio based on these two indicators.
+	It mainly shows the imported data quantity indicators `unselected.rows`, `dpp.norm.ALL` and `dpp.abnorm.ALL`. The first value shows the rows which has been filtered by where predicate. Users can verify that the error rate of the current import task exceeds max\_filter\_ratio based on these two indicators.
 
 + TaskInfo
 
diff --git a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/broker_load_EN.md b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/broker_load_EN.md
index 8d5a19c..6c67996 100644
--- a/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/broker_load_EN.md	
+++ b/docs/documentation/en/sql-reference/sql-statements/Data Manipulation/broker_load_EN.md	
@@ -1,366 +1,344 @@
 # BROKER LOAD
-Description
+## Description
 
-Broker load accesses data from corresponding data sources and imports data through broker deployed with Palo cluster.
-You can view the deployed broker through the show broker command.
-The following four data sources are currently supported:
+    Broker load accesses data from corresponding data sources and imports data through broker deployed with Palo cluster.
+    You can view the deployed broker through the show broker command.
+    The following four data sources are currently supported:
 
-1. Baidu HDFS: Baidu's internal HDFS are limited to Baidu's internal use.
-2. Baidu AFS: Baidu's internal AFs are limited to Baidu's internal use.
-3. Baidu Object Storage (BOS): Baidu Object Storage. Only Baidu internal users, public cloud users or other users who can access BOS.
+    1. Baidu HDFS: Baidu's internal HDFS are limited to Baidu's internal use.
+    2. Baidu AFS: Baidu's internal AFs are limited to Baidu's internal use.
+    3. Baidu Object Storage (BOS): Baidu Object Storage. Only Baidu internal users, public cloud users or other users who can access BOS.
 Four. Apache HDFS
 
 Grammar:
 
-LOAD LABEL load_label
-(
-Date of date of date of entry
-)
-WITH BROKER broker_name
-[broker_properties]
-[opt_properties];
-
-1. load label
-
-The label of the current imported batch. Unique in a database.
-Grammar:
-[database_name.]your_label
-
-2. data_desc
-
-Used to describe a batch of imported data.
-Grammar:
-DATA INFILE
-(
-"file_path1"[, file_path2, ...]
-)
-[NEGATIVE]
-INTO TABLE `table_name`
-[PARTITION (p1, P2)]
-[COLUMNS TERMINATED BY "column_separator"]
-[FORMAT AS "file_type"]
-[(column_list)]
-[set (k1 = fun (k2)]]
-
-Explain:
-file_path:
-
-File paths can be specified to a file, or * wildcards can be used to specify all files in a directory. Wildcards must match to files, not directories.
-
-PARTICIPATION:
-
-If this parameter is specified, only the specified partition will be imported, and data outside the imported partition will be filtered out.
-If not specified, all partitions of the table are imported by default.
-
-NEGATIVE:
-If this parameter is specified, it is equivalent to importing a batch of "negative" data. Used to offset the same batch of data imported before.
-This parameter applies only to the case where there are value columns and the aggregation type of value columns is SUM only.
-
-Column U separator:
-
-Used to specify the column separator in the import file. Default tot
-If the character is invisible, it needs to be prefixed with \x, using hexadecimal to represent the separator.
-For example, the separator X01 of the hive file is specified as "\ x01"
-
-File type:
-
-Used to specify the type of imported file, such as parquet, csv. The default value is determined by the file suffix name.
-
-column_list:
-
-Used to specify the correspondence between columns in the import file and columns in the table.
-When you need to skip a column in the import file, specify it as a column name that does not exist in the table.
-Grammar:
-(col_name1, col_name2, ...)
-
-SET:
-
-If this parameter is specified, a column of the source file can be transformed according to a function, and then the transformed result can be imported into the table.
-The functions currently supported are:
-
-Strftime (fmt, column) date conversion function
-Fmt: Date format, such as% Y% m% d% H% i% S (year, month, day, hour, second)
-Column: Column in column_list, which is the column in the input file. Storage content should be a digital timestamp.
-If there is no column_list, the columns of the input file are entered by default in the column order of the Palo table.
-Note: The digital timestamp is in seconds.
-
-time_format(output_fmt, input_fmt, column) 日期格式转化
-Output_fmt: Converted date format, such as% Y% m% d% H% i% S (year, month, day, hour, second)
-Input_fmt: The date format of the column before transformation, such as% Y%m%d%H%i%S (year, month, day, hour, second)
-Column: Column in column_list, which is the column in the input file. Storage content should be a date string in input_fmt format.
-If there is no column_list, the columns of the input file are entered by default in the column order of the Palo table.
-
-alignment_timestamp(precision, column) 将时间戳对齐到指定精度
-Precision: year 124month;124day;124hour;
-Column: Column in column_list, which is the column in the input file. Storage content should be a digital timestamp.
-If there is no column_list, the columns of the input file are entered by default in the column order of the Palo table.
-Note: When the alignment accuracy is year and month, only the time stamps in the range of 20050101-20191231 are supported.
-
-Default_value (value) sets the default value for a column import
-Use default values of columns when creating tables without specifying
-
-Md5sum (column1, column2,...) evaluates the value of the specified imported column to md5sum, returning a 32-bit hexadecimal string
-
-Replace_value (old_value [, new_value]) replaces old_value specified in the import file with new_value
-New_value, if not specified, uses the default value of the column when building the table
-
-Hll_hash (column) is used to transform a column in a table or data into a data structure of a HLL column
-
-Now () sets the data imported by a column to the point at which the import executes. The column must be of DATE/DATETIME type.
-
-Three. broker name
-
-The name of the broker used can be viewed through the show broker command.
-
-4. broker_properties
-
-Used to provide information to access data sources through broker. Different brokers, as well as different access methods, need to provide different information.
-
-1. HDFS /AFS Baidu
-
-Access to Baidu's internal hdfs/afs currently only supports simple authentication, which needs to be provided:
-Username: HDFS username
-password -hdfs
-
-2. BOS
-
-Need to provide:
-Bos_endpoint: endpoint of BOS
-Bos_accesskey: Accesskey for public cloud users
-Bos_secret_access key: secret_access key for public cloud users
-
-Three. Apache HDFS
-
-Community version of HDFS supports simple authentication and Kerberos authentication. And support HA configuration.
-Simple authentication:
-hadoop.security.authentication = simple (默认)
-Username: HDFS username
-password -hdfs
-
-Kerberos authentication:
-hadoop.security.authentication = kerberos
-Kerberos_principal: Specifies the principal of Kerberos
-Kerberos_keytab: Specifies the KeyTab file path for kerberos. This file must be a file on the server where the broker process resides.
-Kerberos_keytab_content: Specifies the content of the KeyTab file in Kerberos after base64 encoding. This is a choice from the kerberos_keytab configuration.
-
-HA code
-By configuring namenode HA, new namenode can be automatically identified when the namenode is switched
-Dfs. nameservices: Specify the name of the HDFS service and customize it, such as: "dfs. nameservices" = "my_ha"
-Dfs.ha.namenodes.xxx: Customize the name of the namenode, with multiple names separated by commas. Where XXX is a custom name in dfs. name services, such as "dfs. ha. namenodes. my_ha" = "my_nn"
-Dfs.namenode.rpc-address.xxx.nn: Specify RPC address information for namenode. Where NN denotes the name of the namenode configured in dfs.ha.namenodes.xxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"= "host:port"
-Dfs.client.failover.proxy.provider: Specifies the provider that client connects to namenode by default: org.apache.hadoop.hdfs.server.namenode.ha.Configured Failover ProxyProvider
-
-4. opt_properties
-
-Used to specify some special parameters.
-Grammar:
-[PROPERTIES ("key"="value", ...)]
-
-The following parameters can be specified:
-Timeout: Specifies the timeout time of the import operation. The default timeout is 4 hours. Unit seconds.
-Max_filter_ratio: The ratio of data that is most tolerant of being filterable (for reasons such as data irregularities). Default zero tolerance.
-Exec_mem_limit: Sets the upper memory limit for import use. Default is 2G, unit byte. This refers to the upper memory limit of a single BE node.
-An import may be distributed across multiple BEs. We assume that processing 1GB data at a single node requires up to 5GB of memory. Assuming that a 1GB file is distributed among two nodes, then theoretically, each node needs 2.5GB of memory. Then the parameter can be set to 268454560, or 2.5GB.
-Strict mode: is there a strict restriction on data? The default is true.
-
-5. Import data format sample
-
-Integer classes (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1,1000,1234
-Floating Point Class (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, 356
-
-(Note: If it's in other date formats, you can use strftime or time_format functions to convert in the import command)
-字符串类(CHAR/VARCHAR):"I am a student", "a"
+    LOAD LABEL load_label
+    (
+    Date of date of date of entry
+    )
+    WITH BROKER broker_name
+    [broker_properties]
+    [opt_properties];
+
+    1. load label
+    
+        The label of the current imported batch. Unique in a database.
+        Grammar:
+        [database_name.]your_label
+    
+    2. data_desc
+    
+        Used to describe a batch of imported data.
+        Grammar:
+            DATA INFILE
+            (
+            "file_path1"[, file_path2, ...]
+            )
+            [NEGATIVE]
+            INTO TABLE `table_name`
+            [PARTITION (p1, P2)]
+            [COLUMNS TERMINATED BY "column_separator"]
+            [FORMAT AS "file_type"]
+            [(column_list)]
+            [set (k1 = fun (k2)]]
+            [WHERE predicate]    
+    
+        Explain:
+            file_path:
+            
+            File paths can be specified to a file, or * wildcards can be used to specify all files in a directory. Wildcards must match to files, not directories.
+            
+            PARTICIPATION:
+            
+            If this parameter is specified, only the specified partition will be imported, and data outside the imported partition will be filtered out.
+            If not specified, all partitions of the table are imported by default.
+            
+            NEGATIVE:
+            If this parameter is specified, it is equivalent to importing a batch of "negative" data. Used to offset the same batch of data imported before.
+            This parameter applies only to the case where there are value columns and the aggregation type of value columns is SUM only.
+            
+            Column U separator:
+            
+            Used to specify the column separator in the import file. Default tot
+            If the character is invisible, it needs to be prefixed with \x, using hexadecimal to represent the separator.
+            For example, the separator X01 of the hive file is specified as "\ x01"
+            
+            File type:
+            
+            Used to specify the type of imported file, such as parquet, csv. The default value is determined by the file suffix name.
+            
+            column_list:
+            
+            Used to specify the correspondence between columns in the import file and columns in the table.
+            When you need to skip a column in the import file, specify it as a column name that does not exist in the table.
+            Grammar:
+            (col_name1, col_name2, ...)
+            
+            SET:
+            
+            If this parameter is specified, a column of the source file can be transformed according to a function, and then the transformed result can be imported into the table.
+            Example1: table has columns (c1, c2, c3), source file has four columns. The first and second columns in source file map to the c1, c2 and the sum of 3th and 4th map to the c3. So the column mapping is columns (c1,c2,tmp_c3,tmp_c4) SET (c3=tmp_c3+tmp_c4);  
+    
+    3. broker name
+    
+        The name of the broker used can be viewed through the show broker command.
+    
+    4. broker_properties
+    
+        Used to provide information to access data sources through broker. Different brokers, as well as different access methods, need to provide different information.
+        
+        1. HDFS /AFS Baidu
+        
+            Access to Baidu's internal hdfs/afs currently only supports simple authentication, which needs to be provided:
+            Username: HDFS username
+            password -hdfs
+        
+        2. BOS
+        
+            Need to provide:
+            Bos_endpoint: endpoint of BOS
+            Bos_accesskey: Accesskey for public cloud users
+            Bos_secret_access key: secret_access key for public cloud users
+        
+        3. Apache HDFS
+        
+            Community version of HDFS supports simple authentication and Kerberos authentication. And support HA configuration.
+            Simple authentication:
+            hadoop.security.authentication = simple (默认)
+            Username: HDFS username
+            password -hdfs
+            
+            Kerberos authentication:
+            hadoop.security.authentication = kerberos
+            Kerberos_principal: Specifies the principal of Kerberos
+            Kerberos_keytab: Specifies the KeyTab file path for kerberos. This file must be a file on the server where the broker process resides.
+            Kerberos_keytab_content: Specifies the content of the KeyTab file in Kerberos after base64 encoding. This is a choice from the kerberos_keytab configuration.
+            
+            HA code
+            By configuring namenode HA, new namenode can be automatically identified when the namenode is switched
+            Dfs. nameservices: Specify the name of the HDFS service and customize it, such as: "dfs. nameservices" = "my_ha"
+            Dfs.ha.namenodes.xxx: Customize the name of the namenode, with multiple names separated by commas. Where XXX is a custom name in dfs. name services, such as "dfs. ha. namenodes. my_ha" = "my_nn"
+            Dfs.namenode.rpc-address.xxx.nn: Specify RPC address information for namenode. Where NN denotes the name of the namenode configured in dfs.ha.namenodes.xxx, such as: "dfs.namenode.rpc-address.my_ha.my_nn"= "host:port"
+            Dfs.client.failover.proxy.provider: Specifies the provider that client connects to namenode by default: org.apache.hadoop.hdfs.server.namenode.ha.Configured Failover ProxyProvider
+    
+    4. opt_properties
+    
+        Used to specify some special parameters.
+        Grammar:
+        [PROPERTIES ("key"="value", ...)]
+        
+        The following parameters can be specified:
+        Timeout: Specifies the timeout time of the import operation. The default timeout is 4 hours. Unit seconds.
+        Max_filter_ratio: The ratio of data that is most tolerant of being filterable (for reasons such as data irregularities). Default zero tolerance.
+        Exec_mem_limit: Sets the upper memory limit for import use. Default is 2G, unit byte. This refers to the upper memory limit of a single BE node.
+        An import may be distributed across multiple BEs. We assume that processing 1GB data at a single node requires up to 5GB of memory. Assuming that a 1GB file is distributed among two nodes, then theoretically, each node needs 2.5GB of memory. Then the parameter can be set to 268454560, or 2.5GB.
+        Strict mode: is there a strict restriction on data? The default is true.
+    
+    5. Import data format sample
+    
+        Integer classes (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1,1000,1234
+        Floating Point Class (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, 356
+        
+        (Note: If it's in other date formats, you can use strftime or time_format functions to convert in the import command)
+        字符串类(CHAR/VARCHAR):"I am a student", "a"
 NULL value: N
 
-'35;'35; example
-
-1. Import a batch of data from HDFS, specifying the timeout time and filtering ratio. Use the broker with the inscription my_hdfs_broker. Simple authentication.
-
-LOAD LABEL example db.label1
-(
-DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-INTO TABLE `my_table`
-)
-WITH BROKER my_hdfs_broker
-(
-"Username" = "HDFS\\ user"
-"password" = "hdfs_passwd"
-)
-PROPERTIES
-(
-Timeout ="3600",
-"max_filter_ratio" = "0.1"
-);
-
-Where hdfs_host is the host of the namenode and hdfs_port is the fs.defaultFS port (default 9000)
-
-2. A batch of data from AFS, including multiple files. Import different tables, specify separators, and specify column correspondences.
-
-LOAD LABEL example db.label2
-(
-DATA INFILE ("afs http://afs host:hdfs /u port /user /palo /data /input /file1")
-INTO TABLE `my_table_1`
-COLUMNS TERMINATED BY ","
-(k1, k3, k2, v1, v2),
-DATA INFILE ("afs http://afs host:hdfs /u port /user /palo /data /input /file2")
-INTO TABLE `my_table_2`
-COLUMNS TERMINATED BY "\t"
-(k1, k2, k3, v2, v1)
-)
-WITH BROKER my_afs_broker
-(
-"username" ="abu user",
-"password" = "afs_passwd"
-)
-PROPERTIES
-(
-Timeout ="3600",
-"max_filter_ratio" = "0.1"
-);
-
-
-3. Import a batch of data from HDFS, specify hive's default delimiter x01, and use wildcard * to specify all files in the directory.
-Use simple authentication and configure namenode HA at the same time
-
-LOAD LABEL example db.label3
-(
-DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/*")
-INTO TABLE `my_table`
-COLUMNS TERMINATED BY "\\x01"
-)
-WITH BROKER my_hdfs_broker
-(
-"Username" = "HDFS\\ user"
-"password" = "hdfs_passwd",
-"dfs.nameservices" = "my_ha",
-"dfs.ha.namodes.my -ha" ="we named1, we named2",
-"dfs.namode.rpc -address.my ha.my name1" ="nn1 guest:rpc port",
-"dfs.namode.rpc -address.my ha.my name2" ="nn2 guest:rpc port",
-"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
-)
-
-4. Import a batch of "negative" data from HDFS. At the same time, Kerberos authentication is used. Provide KeyTab file path.
-
-LOAD LABEL example db.label4
-(
-DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/old_file)
-NEGATIVE
-INTO TABLE `my_table`
-COLUMNS TERMINATED BY "\t"
-)
-WITH BROKER my_hdfs_broker
-(
-"hadoop.security.authentication" = "kerberos",
-"kerberos" principal ="doris @YOUR.COM",
-"kerberos" keytab ="/home /palo /palo.keytab"
-)
-
-5. Import a batch of data from HDFS and specify partitions. At the same time, Kerberos authentication is used. Provides the KeyTab file content encoded by base64.
-
-LOAD LABEL example db.label5
-(
-DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-INTO TABLE `my_table`
-PARTITION (p1, P2)
-COLUMNS TERMINATED BY ","
-(k1, k3, k2, v1, v2)
-)
-WITH BROKER my_hdfs_broker
-(
-"hadoop.security.authentication"="kerberos",
-"kerberos" principal ="doris @YOUR.COM",
-"kerberos" keytab "content"="BQIAAABEAAEACUJBSURVLkNPTQAEcGFsbw"
-)
-
-6. Import a batch of data from BOS, specify partitions, and make some transformations to the columns of imported files, as follows:
-The table structure is as follows:
-K1 date
-date
-k3 bigint
-k4 varchar (20)
-k5 varchar (64)
-k6 int
-
-Assume that the data file has only one row of data:
-
-1537002087,2018-08-09 11:12:13,1537002087,-,1
-
-The columns in the data file correspond to the columns specified in the import statement:
-tmp -u k1, tmp -u k2, tmp u k3, k6, v1
-
-The conversion is as follows:
-
-1) k1: Transform tmp_k1 timestamp column into datetime type data
-2) k2: Converting tmp_k2 datetime-type data into date data
-3) k3: Transform tmp_k3 timestamp column into day-level timestamp
-4) k4: Specify import default value of 1
-5) k5: Calculate MD5 values from tmp_k1, tmp_k2, tmp_k3 columns
-6) k6: Replace the - value in the imported file with 10
-
-LOAD LABEL example db.label6
-(
-DATA INFILE("bos://my_bucket/input/file")
-INTO TABLE `my_table`
-PARTITION (p1, P2)
-COLUMNS TERMINATED BY ","
-(tmp /u k1, tmp /u k2, tmp /u k3, k6, v1)
-SET (
-
-K2 = Time = UFormat ("% Y -% M -% D% H:% I = S", "% Y -% M -% D", TMP = UK2),
-k3 = alignment_timestamp("day", tmp_k3),
-k4 = default_value("1"),
-K5 = MD5Sum (TMP = UK1, TMP = UK2, TMP = UK3)
-k6 = replace value ("-", "10")
-)
-)
-WITH BROKER my_bos_broker
-(
-"bosu endpoint" ="http://bj.bcebos.com",
-"bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
-"bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
-)
-
-7. Import data into tables containing HLL columns, which can be columns in tables or columns in data
-
-If there are three columns in the table (id, v1, v2). Where V1 and V2 columns are HLL columns. The imported source file has three columns. In column_list, it is declared that the first column is ID and the second and third column is K1 and k2, which are temporarily named.
-In SET, the HLL column in the table must be specifically declared hll_hash. The V1 column in the table is equal to the hll_hash (k1) column in the original data.
-LOAD LABEL example db.label7
-(
-DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-INTO TABLE `my_table`
-PARTITION (p1, P2)
-COLUMNS TERMINATED BY ","
-(id, k1, k2)
-SET (
-v1 = hll, u hash (k1),
-v2 = hll, u hash (k2)
-)
-)
-WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
-LOAD LABEL example db.label8
-(
-DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-INTO TABLE `my_table`
-PARTITION (p1, P2)
-COLUMNS TERMINATED BY ","
-(k1, k2, tmp u k3, tmp u k4, v1, v2)
-SET (
-v1 = hll, u hash (tmp
-v2 = hll, u hash (tmp
-)
-)
-WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
-8. Importing data into Parquet file specifies FORMAT as parquet, which is judged by file suffix by default.
-LOAD LABEL example db.label9
-(
-DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
-INTO TABLE `my_table`
-FORMAT AS "parquet"
-(k1, k2, k3)
-)
-WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
-
+## example
+
+    1. Import a batch of data from HDFS, specifying the timeout time and filtering ratio. Use the broker with the inscription my_hdfs_broker. Simple authentication.
+    
+        LOAD LABEL example db.label1
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "Username" = "HDFS\\ user"
+        "password" = "hdfs_passwd"
+        )
+        PROPERTIES
+        (
+        Timeout ="3600",
+        "max_filter_ratio" = "0.1"
+        );
+        
+        Where hdfs_host is the host of the namenode and hdfs_port is the fs.defaultFS port (default 9000)
+    
+    2. A batch of data from AFS, including multiple files. Import different tables, specify separators, and specify column correspondences.
+    
+        LOAD LABEL example db.label2
+        (
+        DATA INFILE ("afs http://afs host:hdfs /u port /user /palo /data /input /file1")
+        INTO TABLE `my_table_1`
+        COLUMNS TERMINATED BY ","
+        (k1, k3, k2, v1, v2),
+        DATA INFILE ("afs http://afs host:hdfs /u port /user /palo /data /input /file2")
+        INTO TABLE `my_table_2`
+        COLUMNS TERMINATED BY "\t"
+        (k1, k2, k3, v2, v1)
+        )
+        WITH BROKER my_afs_broker
+        (
+        "username" ="abu user",
+        "password" = "afs_passwd"
+        )
+        PROPERTIES
+        (
+        Timeout ="3600",
+        "max_filter_ratio" = "0.1"
+        );
+    
+    
+    3. Import a batch of data from HDFS, specify hive's default delimiter x01, and use wildcard * to specify all files in the directory.
+    Use simple authentication and configure namenode HA at the same time
+    
+        LOAD LABEL example db.label3
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/*")
+        INTO TABLE `my_table`
+        COLUMNS TERMINATED BY "\\x01"
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "Username" = "HDFS\\ user"
+        "password" = "hdfs_passwd",
+        "dfs.nameservices" = "my_ha",
+        "dfs.ha.namodes.my -ha" ="we named1, we named2",
+        "dfs.namode.rpc -address.my ha.my name1" ="nn1 guest:rpc port",
+        "dfs.namode.rpc -address.my ha.my name2" ="nn2 guest:rpc port",
+        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+        )
+    
+    4. Import a batch of "negative" data from HDFS. At the same time, Kerberos authentication is used. Provide KeyTab file path.
+    
+        LOAD LABEL example db.label4
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/old_file)
+        NEGATIVE
+        INTO TABLE `my_table`
+        COLUMNS TERMINATED BY "\t"
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "hadoop.security.authentication" = "kerberos",
+        "kerberos" principal ="doris @YOUR.COM",
+        "kerberos" keytab ="/home /palo /palo.keytab"
+        )
+    
+    5. Import a batch of data from HDFS and specify partitions. At the same time, Kerberos authentication is used. Provides the KeyTab file content encoded by base64.
+    
+        LOAD LABEL example db.label5
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, P2)
+        COLUMNS TERMINATED BY ","
+        (k1, k3, k2, v1, v2)
+        )
+        WITH BROKER my_hdfs_broker
+        (
+        "hadoop.security.authentication"="kerberos",
+        "kerberos" principal ="doris @YOUR.COM",
+        "kerberos" keytab "content"="BQIAAABEAAEACUJBSURVLkNPTQAEcGFsbw"
+        )
+    
+    6. Import a batch of data from BOS, specify partitions, and make some transformations to the columns of imported files, as follows:
+        The table structure is as follows:
+        K1 date
+        date
+        k3 bigint
+        k4 varchar (20)
+        k5 varchar (64)
+        k6 int
+        
+        Assume that the data file has only one row of data:
+        
+        1537002087,2018-08-09 11:12:13,1537002087,-,1
+        
+        The columns in the data file correspond to the columns specified in the import statement:
+        tmp -u k1, tmp -u k2, tmp u k3, k6, v1
+        
+        The conversion is as follows:
+        
+        1) k1: Transform tmp_k1 timestamp column into datetime type data
+        2) k2: Converting tmp_k2 datetime-type data into date data
+        3) k3: Transform tmp_k3 timestamp column into day-level timestamp
+        4) k4: Specify import default value of 1
+        5) k5: Calculate MD5 values from tmp_k1, tmp_k2, tmp_k3 columns
+        6) k6: Replace the - value in the imported file with 10
+        
+        LOAD LABEL example db.label6
+        (
+        DATA INFILE("bos://my_bucket/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, P2)
+        COLUMNS TERMINATED BY ","
+        (tmp /u k1, tmp /u k2, tmp /u k3, k6, v1)
+        SET (
+        
+        K2 = Time = UFormat ("% Y -% M -% D% H:% I = S", "% Y -% M -% D", TMP = UK2),
+        k3 = alignment_timestamp("day", tmp_k3),
+        k4 = default_value("1"),
+        K5 = MD5Sum (TMP = UK1, TMP = UK2, TMP = UK3)
+        k6 = replace value ("-", "10")
+        )
+        )
+        WITH BROKER my_bos_broker
+        (
+        "bosu endpoint" ="http://bj.bcebos.com",
+        "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
+        "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
+        )
+    
+    7. Import data into tables containing HLL columns, which can be columns in tables or columns in data
+    
+        If there are three columns in the table (id, v1, v2). Where V1 and V2 columns are HLL columns. The imported source file has three columns. In column_list, it is declared that the first column is ID and the second and third column is K1 and k2, which are temporarily named.
+        In SET, the HLL column in the table must be specifically declared hll_hash. The V1 column in the table is equal to the hll_hash (k1) column in the original data.
+        LOAD LABEL example db.label7
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, P2)
+        COLUMNS TERMINATED BY ","
+        (id, k1, k2)
+        SET (
+        v1 = hll, u hash (k1),
+        v2 = hll, u hash (k2)
+        )
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+        
+        LOAD LABEL example db.label8
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        PARTITION (p1, P2)
+        COLUMNS TERMINATED BY ","
+        (k1, k2, tmp u k3, tmp u k4, v1, v2)
+        SET (
+        v1 = hll, u hash (tmp
+        v2 = hll, u hash (tmp
+        )
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+    
+    8. Importing data into Parquet file specifies FORMAT as parquet, which is judged by file suffix by default.
+        LOAD LABEL example db.label9
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        FORMAT AS "parquet"
+        (k1, k2, k3)
+        )
+        WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password");
+
+     9. Filter data by k1>k2 
+        LOAD LABEL example_db.label10
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+        INTO TABLE `my_table`
+        where k1 > k2
+        )
 ## keyword
 BROKER,LOAD
diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup
index 458fe0d..7aa05d2 100644
--- a/fe/src/main/cup/sql_parser.cup
+++ b/fe/src/main/cup/sql_parser.cup
@@ -1056,8 +1056,10 @@ data_desc ::=
     opt_col_list:colList
     opt_columns_from_path:columnsFromPath
     opt_col_mapping_list:colMappingList
+    where_clause:whereExpr
     {:
-        RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, columnsFromPath, isNeg, colMappingList);
+        RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat,
+        columnsFromPath, isNeg, colMappingList, whereExpr);
     :}
     ;
 
@@ -3142,10 +3144,10 @@ expr_list ::=
   ;
 
 where_clause ::=
-  KW_WHERE expr:e
-  {: RESULT = e; :}
-  | /* empty */
+   /* empty */
   {: RESULT = null; :}
+  | KW_WHERE expr:e
+  {: RESULT = e; :}
   ;
 
 where_clause_without_null ::=
diff --git a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java
index ff86177..e3fce84 100644
--- a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -91,6 +91,7 @@ public class DataDescription {
     private final List<String> columnsFromPath;
     // save column mapping in SET(xxx = xxx) clause
     private final List<Expr> columnMappingList;
+    private final Expr whereExpr;
 
     // Used for mini load
     private TNetworkAddress beAddr;
@@ -116,7 +117,7 @@ public class DataDescription {
                            String fileFormat,
                            boolean isNegative,
                            List<Expr> columnMappingList) {
-        this(tableName, partitionNames, filePaths, columns, columnSeparator, fileFormat, null, isNegative, columnMappingList);
+        this(tableName, partitionNames, filePaths, columns, columnSeparator, fileFormat, null, isNegative, columnMappingList, null);
     }
 
     public DataDescription(String tableName,
@@ -127,7 +128,8 @@ public class DataDescription {
                            String fileFormat,
                            List<String> columnsFromPath,
                            boolean isNegative,
-                           List<Expr> columnMappingList) {
+                           List<Expr> columnMappingList,
+                           Expr whereExpr) {
         this.tableName = tableName;
         this.partitionNames = partitionNames;
         this.filePaths = filePaths;
@@ -137,6 +139,7 @@ public class DataDescription {
         this.columnsFromPath = columnsFromPath;
         this.isNegative = isNegative;
         this.columnMappingList = columnMappingList;
+        this.whereExpr = whereExpr;
     }
 
     public String getTableName() {
@@ -147,6 +150,10 @@ public class DataDescription {
         return partitionNames;
     }
 
+    public Expr getWhereExpr(){
+        return whereExpr;
+    }
+
     public List<String> getFilePaths() {
         return filePaths;
     }
diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index bc1b623..78b74d1 100644
--- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -74,6 +74,8 @@ public class BrokerFileGroup implements Writable {
     private List<ImportColumnDesc> columnExprList;
     // this is only for hadoop function check
     private Map<String, Pair<String, List<String>>> columnToHadoopFunction;
+    // filter the data which has been conformed
+    private Expr whereExpr;
 
     // Used for recovery from edit log
     private BrokerFileGroup() {
@@ -94,6 +96,7 @@ public class BrokerFileGroup implements Writable {
         this.columnsFromPath = dataDescription.getColumnsFromPath();
         this.columnExprList = dataDescription.getParsedColumnExprList();
         this.columnToHadoopFunction = dataDescription.getColumnToHadoopFunction();
+        this.whereExpr = dataDescription.getWhereExpr();
     }
 
     // NOTE: DBLock will be held
@@ -189,6 +192,10 @@ public class BrokerFileGroup implements Writable {
         return partitionIds;
     }
 
+    public Expr getWhereExpr() {
+        return whereExpr;
+    }
+
     public List<String> getFilePaths() {
         return filePaths;
     }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 7a5f682..9c87721 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -239,12 +239,12 @@ public class BrokerLoadJob extends LoadJob {
                 // retry task
                 idToTasks.remove(loadTask.getSignature());
                 if (loadTask instanceof LoadLoadingTask) {
-                    loadStatistic.numLoadedRowsMap.remove(((LoadLoadingTask) loadTask).getLoadId());
+                    loadStatistic.numScannedRowsMap.remove(((LoadLoadingTask) loadTask).getLoadId());
                 }
                 loadTask.updateRetryInfo();
                 idToTasks.put(loadTask.getSignature(), loadTask);
                 if (loadTask instanceof LoadLoadingTask) {
-                    loadStatistic.numLoadedRowsMap.put(((LoadLoadingTask) loadTask).getLoadId(), new AtomicLong(0));
+                    loadStatistic.numScannedRowsMap.put(((LoadLoadingTask) loadTask).getLoadId(), new AtomicLong(0));
                 }
                 Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
                 return;
@@ -365,7 +365,7 @@ public class BrokerLoadJob extends LoadJob {
                 // idToTasks contains previous LoadPendingTasks, so idToTasks is just used to save all tasks.
                 // use newLoadingTasks to save new created loading tasks and submit them later.
                 newLoadingTasks.add(task);
-                loadStatistic.numLoadedRowsMap.put(loadId, new AtomicLong(0));
+                loadStatistic.numScannedRowsMap.put(loadId, new AtomicLong(0));
 
                 // save all related tables and rollups in transaction state
                 TransactionState txnState = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(transactionId);
@@ -463,6 +463,8 @@ public class BrokerLoadJob extends LoadJob {
                                      increaseCounter(DPP_ABNORMAL_ALL, attachment.getCounter(DPP_ABNORMAL_ALL)));
         loadingStatus.replaceCounter(DPP_NORMAL_ALL,
                                      increaseCounter(DPP_NORMAL_ALL, attachment.getCounter(DPP_NORMAL_ALL)));
+        loadingStatus.replaceCounter(UNSELECTED_ROWS,
+                                     increaseCounter(UNSELECTED_ROWS, attachment.getCounter(UNSELECTED_ROWS)));
         if (attachment.getTrackingUrl() != null) {
             loadingStatus.setTrackingUrl(attachment.getTrackingUrl());
         }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 4219106..2cc30c6 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -77,6 +77,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
     protected static final String QUALITY_FAIL_MSG = "quality not good enough to cancel";
     protected static final String DPP_NORMAL_ALL = "dpp.norm.ALL";
     protected static final String DPP_ABNORMAL_ALL = "dpp.abnorm.ALL";
+    public static final String UNSELECTED_ROWS = "unselected.rows";
 
     protected long id;
     // input params
@@ -129,22 +130,22 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
     public static class LoadStatistic {
         // number of rows processed on BE, this number will be updated periodically by query report.
         // A load job may has several load tasks, so the map key is load task's plan load id.
-        public Map<TUniqueId, AtomicLong> numLoadedRowsMap = Maps.newConcurrentMap();
+        public Map<TUniqueId, AtomicLong> numScannedRowsMap = Maps.newConcurrentMap();
         // number of file to be loaded
         public int fileNum = 0;
         public long totalFileSizeB = 0;
         
         public String toJson() {
             long total = 0;
-            for (AtomicLong atomicLong : numLoadedRowsMap.values()) {
+            for (AtomicLong atomicLong : numScannedRowsMap.values()) {
                 total += atomicLong.get();
             }
 
             Map<String, Object> details = Maps.newHashMap();
-            details.put("LoadedRows", total);
+            details.put("ScannedRows", total);
             details.put("FileNumber", fileNum);
             details.put("FileSize", totalFileSizeB);
-            details.put("TaskNumber", numLoadedRowsMap.size());
+            details.put("TaskNumber", numScannedRowsMap.size());
             Gson gson = new Gson();
             return gson.toJson(details);
         }
@@ -221,10 +222,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         return transactionId;
     }
 
-    public void updateLoadedRows(TUniqueId loadId, long loadedRows) {
-        AtomicLong atomicLong = loadStatistic.numLoadedRowsMap.get(loadId);
+    public void updateScannedRows(TUniqueId loadId, long scannedRows) {
+        AtomicLong atomicLong = loadStatistic.numScannedRowsMap.get(loadId);
         if (atomicLong != null) {
-            atomicLong.set(loadedRows);
+            atomicLong.set(scannedRows);
         }
     }
 
@@ -509,7 +510,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
             }
         }
         idToTasks.clear();
-        loadStatistic.numLoadedRowsMap.clear();
+        loadStatistic.numScannedRowsMap.clear();
 
         // set failMsg and state
         this.failMsg = failMsg;
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index fb76b39..ab3b504 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -560,10 +560,10 @@ public class LoadManager implements Writable{
         return false;
     }
 
-    public void updateJobLoadedRows(Long jobId, TUniqueId loadId, long loadedRows) {
+    public void updateJobScannedRows(Long jobId, TUniqueId loadId, long scannedRows) {
         LoadJob job = idToLoadJob.get(jobId);
         if (job != null) {
-            job.updateLoadedRows(loadId, loadedRows);
+            job.updateScannedRows(loadId, scannedRows);
         }
     }
 
diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index e5817d8..5aab4a8 100644
--- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -74,7 +74,7 @@ import java.util.Map;
 import java.util.Random;
 
 // Broker scan node
-public class BrokerScanNode extends ScanNode {
+public class BrokerScanNode extends LoadScanNode {
     private static final Logger LOG = LogManager.getLogger(BrokerScanNode.class);
     private static final TBrokerFileStatusComparator T_BROKER_FILE_STATUS_COMPARATOR
             = new TBrokerFileStatusComparator();
@@ -158,18 +158,16 @@ public class BrokerScanNode extends ScanNode {
         getFileStatusAndCalcInstance();
 
         paramCreateContexts = Lists.newArrayList();
-        int i = 0;
         for (BrokerFileGroup fileGroup : fileGroups) {
             ParamCreateContext context = new ParamCreateContext();
             context.fileGroup = fileGroup;
             context.timezone = analyzer.getTimezone();
             try {
-                initParams(context, fileStatusesList.get(i));
+                initParams(context);
             } catch (AnalysisException e) {
                 throw new UserException(e.getMessage());
             }
             paramCreateContexts.add(context);
-            ++i;
         }
     }
 
@@ -201,8 +199,8 @@ public class BrokerScanNode extends ScanNode {
     }
 
     // Called from init, construct source tuple information
-    private void initParams(ParamCreateContext context, List<TBrokerFileStatus> fileStatus)
-            throws AnalysisException, UserException {
+    private void initParams(ParamCreateContext context)
+            throws UserException {
         TBrokerScanRangeParams params = new TBrokerScanRangeParams();
         context.params = params;
 
@@ -212,6 +210,7 @@ public class BrokerScanNode extends ScanNode {
         params.setStrict_mode(strictMode);
         params.setProperties(brokerDesc.getProperties());
         initColumns(context);
+        initWhereExpr(fileGroup.getWhereExpr(), analyzer);
     }
 
     /**
diff --git a/fe/src/main/java/org/apache/doris/planner/LoadScanNode.java b/fe/src/main/java/org/apache/doris/planner/LoadScanNode.java
new file mode 100644
index 0000000..29a6dc6
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/planner/LoadScanNode.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ExprSubstitutionMap;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.UserException;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class LoadScanNode extends ScanNode {
+
+    public LoadScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
+        super(id, desc, planNodeName);
+    }
+
+    protected void initWhereExpr(Expr whereExpr, Analyzer analyzer) throws UserException {
+        if (whereExpr == null) {
+            return;
+        }
+        
+        Map<String, SlotDescriptor> dstDescMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        for (SlotDescriptor slotDescriptor : desc.getSlots()) {
+            dstDescMap.put(slotDescriptor.getColumn().getName(), slotDescriptor);
+        }
+
+        // substitute SlotRef in filter expression
+        // where expr must be rewrite first to transfer some predicates(eg: BetweenPredicate to BinaryPredicate)
+        whereExpr = analyzer.getExprRewriter().rewrite(whereExpr, analyzer);
+        List<SlotRef> slots = Lists.newArrayList();
+        whereExpr.collect(SlotRef.class, slots);
+
+        ExprSubstitutionMap smap = new ExprSubstitutionMap();
+        for (SlotRef slot : slots) {
+            SlotDescriptor slotDesc = dstDescMap.get(slot.getColumnName());
+            if (slotDesc == null) {
+                throw new UserException("unknown column reference in where statement, reference="
+                                                + slot.getColumnName());
+            }
+            smap.getLhs().add(slot);
+            smap.getRhs().add(new SlotRef(slotDesc));
+        }
+        whereExpr = whereExpr.clone(smap);
+        whereExpr.analyze(analyzer);
+        if (whereExpr.getType() != Type.BOOLEAN) {
+            throw new UserException("where statement is not a valid statement return bool");
+        }
+        addConjuncts(whereExpr.getConjuncts());
+    }
+
+}
diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
index 3a4a6a5..440b13a 100644
--- a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
@@ -20,7 +20,6 @@ package org.apache.doris.planner;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.ArithmeticExpr;
 import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.ExprSubstitutionMap;
 import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.NullLiteral;
@@ -61,7 +60,7 @@ import java.util.Map;
 /**
  * used to scan from stream
  */
-public class StreamLoadScanNode extends ScanNode {
+public class StreamLoadScanNode extends LoadScanNode {
     private static final Logger LOG = LogManager.getLogger(StreamLoadScanNode.class);
 
     private TUniqueId loadId;
@@ -123,37 +122,7 @@ public class StreamLoadScanNode extends ScanNode {
                 exprsByName, analyzer, srcTupleDesc, slotDescByName, params);
 
         // analyze where statement
-        if (streamLoadTask.getWhereExpr() != null) {
-            Map<String, SlotDescriptor> dstDescMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
-            for (SlotDescriptor slotDescriptor : desc.getSlots()) {
-                dstDescMap.put(slotDescriptor.getColumn().getName(), slotDescriptor);
-            }
-
-            // substitute SlotRef in filter expression
-            Expr whereExpr = streamLoadTask.getWhereExpr();
-            // where expr must be rewrite first to transfer some predicates(eg: BetweenPredicate to BinaryPredicate)
-            whereExpr = analyzer.getExprRewriter().rewrite(whereExpr, analyzer);
-
-            List<SlotRef> slots = Lists.newArrayList();
-            whereExpr.collect(SlotRef.class, slots);
-
-            ExprSubstitutionMap smap = new ExprSubstitutionMap();
-            for (SlotRef slot : slots) {
-                SlotDescriptor slotDesc = dstDescMap.get(slot.getColumnName());
-                if (slotDesc == null) {
-                    throw new UserException("unknown column reference in where statement, reference="
-                            + slot.getColumnName());
-                }
-                smap.getLhs().add(slot);
-                smap.getRhs().add(new SlotRef(slotDesc));
-            }
-            whereExpr= whereExpr.clone(smap);
-            whereExpr.analyze(analyzer);
-            if (whereExpr.getType() != Type.BOOLEAN) {
-                throw new UserException("where statement is not a valid statement return bool");
-            }
-            addConjuncts(whereExpr.getConjuncts());
-        }
+        initWhereExpr(streamLoadTask.getWhereExpr(), analyzer);
 
         computeStats(analyzer);
         createDefaultSmap(analyzer);
diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
index 4680720..4bfa38d 100644
--- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.util.ListUtil;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.load.LoadErrorHub;
+import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.planner.DataPartition;
 import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.DataStreamSink;
@@ -538,6 +539,11 @@ public class Coordinator {
             if (value != null) {
                 numRowsAbnormal = Long.valueOf(value);
             }
+            long numRowsUnselected = 0L;
+            value = this.loadCounters.get(LoadJob.UNSELECTED_ROWS);
+            if (value != null) {
+                numRowsUnselected = Long.valueOf(value);
+            }
 
             // new load counters
             value = newLoadCounters.get(LoadEtlTask.DPP_NORMAL_ALL);
@@ -548,9 +554,14 @@ public class Coordinator {
             if (value != null) {
                 numRowsAbnormal += Long.valueOf(value);
             }
+            value = newLoadCounters.get(LoadJob.UNSELECTED_ROWS);
+            if (value != null) {
+                numRowsUnselected += Long.valueOf(value);
+            }
 
             this.loadCounters.put(LoadEtlTask.DPP_NORMAL_ALL, "" + numRowsNormal);
             this.loadCounters.put(LoadEtlTask.DPP_ABNORMAL_ALL, "" + numRowsAbnormal);
+            this.loadCounters.put(LoadJob.UNSELECTED_ROWS, "" + numRowsUnselected);
         } finally {
             lock.unlock();
         }
@@ -1180,7 +1191,7 @@ public class Coordinator {
         }
 
         if (params.isSetLoaded_rows()) {
-            Catalog.getCurrentCatalog().getLoadManager().updateJobLoadedRows(jobId, params.query_id, params.loaded_rows);
+            Catalog.getCurrentCatalog().getLoadManager().updateJobScannedRows(jobId, params.query_id, params.loaded_rows);
         }
 
         return;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org