You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/04 14:44:57 UTC

[doris] branch master updated: [Feature-Wip](MySQL Load)Show load warning for my sql load (#18224)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c36bef6bc [Feature-Wip](MySQL Load)Show load warning for my sql load (#18224)
7c36bef6bc is described below

commit 7c36bef6bc8943d83dedb97ec8e84124b9a2d103
Author: huangzhaowei <hu...@bytedance.com>
AuthorDate: Tue Apr 4 22:44:48 2023 +0800

    [Feature-Wip](MySQL Load)Show load warning for my sql load (#18224)
    
    1. Support the show load warnings for mysql load to get the detail error message.
    2. Fix fillByteBufferAsync not mark the load as finished in same data load
    3. Fix drain data only in client mode.
---
 .../import/import-way/mysql-load-manual.md         | 29 +++++++++-
 .../import/import-way/mysql-load-manual.md         | 26 ++++++++-
 .../main/java/org/apache/doris/common/Config.java  |  2 +
 .../apache/doris/load/loadv2/MysqlLoadManager.java | 64 +++++++++++++++++++---
 .../java/org/apache/doris/qe/ShowExecutor.java     | 17 ++++++
 .../mysql_load/test_mysql_load_tiny_file.csv       |  1 +
 .../mysql_load/test_mysql_load_tiny_file.out       |  4 ++
 .../mysql_load/test_mysql_load_tiny_file.groovy    | 52 ++++++++++++++++++
 8 files changed, 185 insertions(+), 10 deletions(-)

diff --git a/docs/en/docs/data-operate/import/import-way/mysql-load-manual.md b/docs/en/docs/data-operate/import/import-way/mysql-load-manual.md
index 82dec28a1f..d9ca89a9b8 100644
--- a/docs/en/docs/data-operate/import/import-way/mysql-load-manual.md
+++ b/docs/en/docs/data-operate/import/import-way/mysql-load-manual.md
@@ -56,6 +56,15 @@ MySql Load currently only supports data formats: CSV (text).
 
 ## Basic operations
 
+### client connection
+```bash
+mysql --local-infile  -h 127.0.0.1 -P 9030 -u root -D testdb
+```
+
+Notice that if you wants to use mysql load, you must connect doris server with `--local-infile` in client command.
+If you're use jdbc to connect doris, you must add property named `allowLoadLocalInfile=true` in jdbc url.
+
+
 ### Create test table
 ```sql
  CREATE TABLE testdb.t1 (pk INT, v1 INT SUM) AGGREGATE KEY (pk) DISTRIBUTED BY hash (pk) PROPERTIES ('replication_num' = '1');
@@ -113,9 +122,25 @@ Query OK, 1 row affected (0.17 sec)
 Records: 1 Deleted: 0 Skipped: 0 Warnings: 0
 ```
 
+### Error result
+If mysql load process goes wrong, it will show the error in the client as below:
+```text
+ERROR 1105 (HY000): errCode = 2, detailMessage = [INTERNAL_ERROR]too many filtered rows with load id b612907c-ccf4-4ac2-82fe-107ece655f0f
+```
+
+If you meets this error, you can extract the `loadId` and use it in the `show load warnings` command to get more detail message.
+```sql
+show load warnings where label='b612907c-ccf4-4ac2-82fe-107ece655f0f';
+```
+
+The loadId was the label in this case.
+
+
 ### Configuration
-1. `mysql_load_thread_pool`: the thread pool size for singe FE node, set 4 thread by default. The block queue size is 5 times of `mysql_load_thread_pool`. So FE can accept 4 + 4*5 = 24 requests in one time. Increase this configuration if the parallelism are larger than 24.
+1. `mysql_load_thread_pool`: the thread pool size for singe FE node, set 4 thread by default. The block queue size is 5 times of `mysql_load_thread_pool`. So FE can accept 4 + 4\*5 = 24 requests in one time. Increase this configuration if the parallelism are larger than 24.
 2. `mysql_load_server_secure_path`: the secure path for load data from server. Empty path by default means that it's not allowed for server load. Recommend to create a `local_import_data` directory under `DORIS_HOME` to load data if you want enable it.
+3. `mysql_load_in_memory_record` The failed mysql load record size. The record was keep in memory and only have 20 records by default. If you want to track more records,  you can rise the config but be careful about the fe memory. This record will expired after one day and there is a async thread to clean it in every day.
+
 
 ## Notice 
 
@@ -124,4 +149,4 @@ Records: 1 Deleted: 0 Skipped: 0 Warnings: 0
 
 ## More Help
 
-1. For more detailed syntax and best practices for using MySQL Load, see the [MySQL Load](../../../sql-manual/sql-reference/Data-Manipulation-Statements/Load/MYSQL-LOAD.md) command manual.
\ No newline at end of file
+1. For more detailed syntax and best practices for using MySQL Load, see the [MySQL Load](../../../sql-manual/sql-reference/Data-Manipulation-Statements/Load/MYSQL-LOAD.md) command manual.
diff --git a/docs/zh-CN/docs/data-operate/import/import-way/mysql-load-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/mysql-load-manual.md
index 45e2d9d33a..839bef8b23 100644
--- a/docs/zh-CN/docs/data-operate/import/import-way/mysql-load-manual.md
+++ b/docs/zh-CN/docs/data-operate/import/import-way/mysql-load-manual.md
@@ -56,6 +56,14 @@ MySQL Load 支持数据格式:CSV(文本)。
 
 ## 基本操作举例
 
+### 客户端连接
+```bash
+mysql --local-infile  -h 127.0.0.1 -P 9030 -u root -D testdb
+```
+
+注意: 执行MySQL Load语句的时候, 客户端命令必须带有`--local-infile`, 否则执行可能会出现错误. 如果是通过JDBC方式连接的话, 需要在URL中需要加入配置`allowLoadLocalInfile=true`
+
+
 ### 创建测试表
 ```sql
 CREATE TABLE testdb.t1 (pk INT, v1 INT SUM) AGGREGATE KEY (pk) DISTRIBUTED BY hash (pk) PROPERTIES ('replication_num' = '1');
@@ -115,9 +123,25 @@ Query OK, 1 row affected (0.17 sec)
 Records: 1  Deleted: 0  Skipped: 0  Warnings: 0
 ```
 
+### 异常结果
+如果执行出现异常, 会在客户端中出现如下异常显示
+```text
+ERROR 1105 (HY000): errCode = 2, detailMessage = [INTERNAL_ERROR]too many filtered rows with load id b612907c-ccf4-4ac2-82fe-107ece655f0f
+```
+
+当遇到这类异常错误, 可以找到其中的`loadId`, 可以通过`show load warnings`命令在客户端中展示详细的异常信息.
+```sql
+show load warnings where label='b612907c-ccf4-4ac2-82fe-107ece655f0f';
+```
+
+异常信息中的LoadId即为Warning命令中的label字段.
+
+
 ### 配置项
-1. `mysql_load_thread_pool`控制单个FE中MySQL Load并发执行线程个数, 默认为4. 线程池的排队对接大小为`mysql_load_thread_pool`的5倍, 因此默认情况下, 可以并发提交的任务为 4 + 4*5 = 24个. 如果并发个数超过24时, 可以调大该配置项.
+1. `mysql_load_thread_pool`控制单个FE中MySQL Load并发执行线程个数, 默认为4. 线程池的排队对接大小为`mysql_load_thread_pool`的5倍, 因此默认情况下, 可以并发提交的任务为 4 + 4\*5 = 24个. 如果并发个数超过24时, 可以调大该配置项.
 2. `mysql_load_server_secure_path`服务端导入的安全路径, 默认为空, 即不允许服务端导入. 如需开启这个功能, 建议在`DORIS_HOME`目录下创建一个`local_import_data`目录, 用于导入数据.
+3. `mysql_load_in_memory_record`失败的任务记录个数, 该记录会保留在内存中, 默认只会保留最近的20. 如果有需要可以调大该配置. 在内存中的记录, 有效期为1天, 异步清理线程会固定一天清理一次过期数据.
+
 
 ## 注意事项
 
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index f8aa2c310e..c3211a3818 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2098,6 +2098,8 @@ public class Config extends ConfigBase {
     @ConfField(mutable = false, masterOnly = false)
     public static String mysql_load_server_secure_path = "";
 
+    @ConfField(mutable = false, masterOnly = false)
+    public static int mysql_load_in_memory_record = 20;
 
     @ConfField(mutable = false, masterOnly = false)
     public static int mysql_load_thread_pool = 4;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index 073191bd51..732075b068 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -39,6 +39,7 @@ import org.apache.doris.system.BeSelectionPolicy;
 import org.apache.doris.system.SystemInfoService;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.EvictingQueue;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -60,7 +61,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public class MysqlLoadManager {
     private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class);
@@ -103,14 +107,43 @@ public class MysqlLoadManager {
         }
     }
 
-    private final Map<String, MySqlLoadContext> loadContextMap = new ConcurrentHashMap<>();
+    private static class MySqlLoadFailRecord {
+        private final String label;
+
+        private final String errorUrl;
+        private final long startTime;
+
+        public MySqlLoadFailRecord(String label, String errorUrl) {
+            this.label = label;
+            this.errorUrl = errorUrl;
+            this.startTime = System.currentTimeMillis();
+        }
+
+        public String getLabel() {
+            return label;
+        }
+
+        public String getErrorUrl() {
+            return errorUrl;
+        }
+
+        public boolean isExpired() {
+            // hard code the expired value for one day.
+            return System.currentTimeMillis() > startTime + 24 * 60 * 60 * 1000;
+        }
+    }
 
+    private final Map<String, MySqlLoadContext> loadContextMap = new ConcurrentHashMap<>();
+    private final EvictingQueue<MySqlLoadFailRecord> failedRecords;
+    private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1);
 
     public MysqlLoadManager(TokenManager tokenManager) {
         int poolSize = Config.mysql_load_thread_pool;
         // MySqlLoad pool can accept 4 + 4 * 5 = 24  requests by default.
         this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, "Mysql Load", true);
         this.tokenManager = tokenManager;
+        this.failedRecords = EvictingQueue.create(Config.mysql_load_in_memory_record);
+        this.periodScheduler.scheduleAtFixedRate(this::cleanFailedRecords, 1, 24, TimeUnit.HOURS);
     }
 
     public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, LoadStmt stmt, String loadId)
@@ -131,20 +164,22 @@ public class MysqlLoadManager {
                     new SetVar(SessionVariable.QUERY_TIMEOUT, new StringLiteral(String.valueOf(newTimeOut))));
         }
         String token = tokenManager.acquireToken();
+        boolean clientLocal = dataDesc.isClientLocal();
+        MySqlLoadContext loadContext = new MySqlLoadContext();
+        loadContextMap.put(loadId, loadContext);
         LOG.info("execute MySqlLoadJob for id: {}.", loadId);
         try (final CloseableHttpClient httpclient = HttpClients.createDefault()) {
             for (String file : filePaths) {
-                InputStreamEntity entity = getInputStreamEntity(context, dataDesc.isClientLocal(), file, loadId);
+                InputStreamEntity entity = getInputStreamEntity(context, clientLocal, file, loadId);
                 HttpPut request = generateRequestForMySqlLoad(entity, dataDesc, database, table, token);
-                MySqlLoadContext loadContext = new MySqlLoadContext();
                 loadContext.setRequest(request);
-                loadContextMap.put(loadId, loadContext);
                 try (final CloseableHttpResponse response = httpclient.execute(request)) {
                     String body = EntityUtils.toString(response.getEntity());
                     JsonObject result = JsonParser.parseString(body).getAsJsonObject();
                     if (!result.get("Status").getAsString().equalsIgnoreCase("Success")) {
+                        failedRecords.offer(new MySqlLoadFailRecord(loadId, result.get("ErrorURL").getAsString()));
                         LOG.warn("Execute mysql data load failed with request: {} and response: {}", request, body);
-                        throw new LoadException(result.get("Message").getAsString());
+                        throw new LoadException(result.get("Message").getAsString() + " with load id " + loadId);
                     }
                     loadResult.incRecords(result.get("NumberLoadedRows").getAsLong());
                     loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
@@ -153,7 +188,7 @@ public class MysqlLoadManager {
         } catch (Throwable t) {
             LOG.warn("Execute mysql load {} failed", loadId, t);
             // drain the data from client conn util empty packet received, otherwise the connection will be reset
-            if (loadContextMap.containsKey(loadId) && !loadContextMap.get(loadId).isFinished()) {
+            if (clientLocal && loadContextMap.containsKey(loadId) && !loadContextMap.get(loadId).isFinished()) {
                 LOG.warn("not drained yet, try reading left data from client connection for load {}.", loadId);
                 ByteBuffer buffer = context.getMysqlChannel().fetchOnePacket();
                 // MySql client will send an empty packet when eof
@@ -184,7 +219,22 @@ public class MysqlLoadManager {
         }
     }
 
-    public int extractTimeOut(DataDescription desc) {
+    public String getErrorUrlByLoadId(String loadId) {
+        for (MySqlLoadFailRecord record : failedRecords) {
+            if (loadId.equals(record.getLabel())) {
+                return record.getErrorUrl();
+            }
+        }
+        return null;
+    }
+
+    private void cleanFailedRecords() {
+        while (!failedRecords.isEmpty() && failedRecords.peek().isExpired()) {
+            failedRecords.poll();
+        }
+    }
+
+    private int extractTimeOut(DataDescription desc) {
         if (desc.getProperties() != null && desc.getProperties().containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
             return Integer.parseInt(desc.getProperties().get(LoadStmt.TIMEOUT_PROPERTY));
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index b8a7d28bb6..cd6b9c98ff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -210,6 +210,7 @@ import org.apache.logging.log4j.Logger;
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLConnection;
 import java.util.ArrayList;
@@ -1284,6 +1285,22 @@ public class ShowExecutor {
         }
 
         Env env = Env.getCurrentEnv();
+        // try to fetch load id from mysql load first and mysql load only support find by label.
+        if (showWarningsStmt.isFindByLabel()) {
+            String label = showWarningsStmt.getLabel();
+            String urlString = env.getLoadManager().getMysqlLoadManager().getErrorUrlByLoadId(label);
+            if (urlString != null && !urlString.isEmpty()) {
+                URL url;
+                try {
+                    url = new URL(urlString);
+                } catch (MalformedURLException e) {
+                    throw new AnalysisException("Invalid url: " + e.getMessage());
+                }
+                handleShowLoadWarningsFromURL(showWarningsStmt, url);
+                return;
+            }
+        }
+
         Database db = env.getInternalCatalog().getDbOrAnalysisException(showWarningsStmt.getDbName());
 
         long dbId = db.getId();
diff --git a/regression-test/data/load_p0/mysql_load/test_mysql_load_tiny_file.csv b/regression-test/data/load_p0/mysql_load/test_mysql_load_tiny_file.csv
new file mode 100644
index 0000000000..9874d6464a
--- /dev/null
+++ b/regression-test/data/load_p0/mysql_load/test_mysql_load_tiny_file.csv
@@ -0,0 +1 @@
+1	2
diff --git a/regression-test/data/load_p0/mysql_load/test_mysql_load_tiny_file.out b/regression-test/data/load_p0/mysql_load/test_mysql_load_tiny_file.out
new file mode 100644
index 0000000000..215942b6d2
--- /dev/null
+++ b/regression-test/data/load_p0/mysql_load/test_mysql_load_tiny_file.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !sql --
+1	40
+
diff --git a/regression-test/suites/load_p0/mysql_load/test_mysql_load_tiny_file.groovy b/regression-test/suites/load_p0/mysql_load/test_mysql_load_tiny_file.groovy
new file mode 100644
index 0000000000..581dede4ef
--- /dev/null
+++ b/regression-test/suites/load_p0/mysql_load/test_mysql_load_tiny_file.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mysql_load_tiny_file", "p0") {
+    def tableName = "test_mysql_load_tiny_file"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` INT,
+            `v5` INT SUM
+        ) ENGINE=OLAP
+        AGGREGATE KEY(`k1`)
+        COMMENT 'OLAP'
+        PARTITION BY RANGE(`k1`)
+        (PARTITION partition_a VALUES [("-10240000"), ("100000")),
+        PARTITION partition_b VALUES [("100000"), ("1000000000")),
+        PARTITION partition_d VALUES [("1000000000"), (MAXVALUE)))
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    def test_mysql_load_tiny_file = getLoalFilePath "test_mysql_load_tiny_file.csv"
+
+    for (int i = 0; i < 20; i++) {
+        sql """
+            LOAD DATA 
+            LOCAL
+            INFILE '${test_mysql_load_tiny_file}'
+            INTO TABLE ${tableName}
+            COLUMNS TERMINATED BY '\t';
+        """
+    }
+
+    sql "sync"
+    qt_sql "select * from ${tableName} order by k1"
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org