You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/04/29 01:20:58 UTC

[incubator-doris] branch master updated: support show stream load sql (#5488)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9001fd2  support show stream load sql (#5488)
9001fd2 is described below

commit 9001fd28f4d393a2b08a1d23ff34cbfae0f3be8a
Author: weizuo93 <68...@users.noreply.github.com>
AuthorDate: Thu Apr 29 09:20:35 2021 +0800

    support show stream load sql (#5488)
    
    Co-authored-by: weizuo <we...@xiaomi.com>
---
 .../Data Manipulation/SHOW STREAM LOAD.md          |  68 +++++
 .../Data Manipulation/SHOW STREAM LOAD.md          |  69 +++++
 fe/fe-core/src/main/cup/sql_parser.cup             |   7 +-
 .../apache/doris/analysis/ShowStreamLoadStmt.java  | 323 +++++++++++++++++++++
 .../main/java/org/apache/doris/common/Config.java  |  12 +
 .../org/apache/doris/load/StreamLoadRecord.java    |  91 ++++++
 .../org/apache/doris/load/StreamLoadRecordMgr.java | 192 ++++++++++++
 .../java/org/apache/doris/qe/ShowExecutor.java     |  56 ++++
 fe/fe-core/src/main/jflex/sql_scanner.flex         |   1 +
 9 files changed, 818 insertions(+), 1 deletion(-)

diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW STREAM LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW STREAM LOAD.md
new file mode 100644
index 0000000..fc870e7
--- /dev/null
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW STREAM LOAD.md	
@@ -0,0 +1,68 @@
+---
+{
+    "title": "SHOW STREAM LOAD",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# SHOW STREAM LOAD
+## Description
+This statement is used to show the execution of the specified import task
+Grammar:
+SHOW STREAM LOAD
+[FROM both names]
+[
+WHERE
+[LABEL [ = "your_label" | LIKE "label_matcher"]]
+[STATUS = ["SUCCESS"|"FAIL"|]]
+]
+[ORDER BY ...]
+[LIMIT limit][OFFSET offset];
+
+Explain:
+1) If db_name is not specified, use the current default DB
+2) If you use LABEL LIKE, the label that matches the task contains the STREAM LOAD task of label_matcher
+3) If LABEL = is used, the specified label is matched accurately.
+4) If STATUS is specified, the STREAM LOAD status is matched
+5) Arbitrary column combinations can be sorted using ORDER BY
+6) If LIMIT is specified, the limit bar matching record is displayed. Otherwise, all of them will be displayed.
+7) If OFFSET is specified, the query results are displayed from offset. By default, the offset is 0.
+
+## example
+1. Show all STREAM LOAD tasks of default DB
+SHOW STREAM LOAD;
+
+2. Show the STREAM LOAD task of the specified db. The label contains the string "2014_01_02", showing the oldest 10
+SHOW STREAM LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;
+
+3. Show the STREAM LOAD task of the specified db, specify label as "load_example_db_20140102"
+SHOW STREAM LOAD FROM example_db WHERE LABEL = "load_example_db_20140102";
+
+4. Show the STREAM LOAD task of the specified db, specify status as "success", and sort it in descending order by LoadStartTime
+SHOW STREAM LOAD FROM example_db WHERE STATUS = "success" ORDER BY LoadStartTime DESC;
+
+5. Show the STREAM LOAD task of the specified dB and sort it in descending order by LoadStartTime, and display 10 query results starting with offset 5
+SHOW STREAM LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10;
+SHOW STREAM LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;
+
+## keyword
+SHOW,STREAM LOAD
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW STREAM LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW STREAM LOAD.md
new file mode 100644
index 0000000..ec9f570
--- /dev/null
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW STREAM LOAD.md	
@@ -0,0 +1,69 @@
+---
+{
+    "title": "SHOW STREAM LOAD",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# SHOW STREAM LOAD
+## description
+    该语句用于展示指定的Stream Load任务的执行情况
+    语法:
+        SHOW STREAM LOAD
+        [FROM db_name]
+        [
+            WHERE 
+            [LABEL [ = "your_label" | LIKE "label_matcher"]]
+            [STATUS = ["SUCCESS"|"FAIL"]]
+        ]
+        [ORDER BY ...]
+        [LIMIT limit][OFFSET offset];
+        
+    说明:
+        1) 如果不指定 db_name,使用当前默认db
+        2) 如果使用 LABEL LIKE,则会匹配Stream Load任务的 label 包含 label_matcher 的任务
+        3) 如果使用 LABEL = ,则精确匹配指定的 label
+        4) 如果指定了 STATUS,则匹配 STREAM LOAD 状态
+        5) 可以使用 ORDER BY 对任意列组合进行排序
+        6) 如果指定了 LIMIT,则显示 limit 条匹配记录。否则全部显示
+        7) 如果指定了 OFFSET,则从偏移量offset开始显示查询结果。默认情况下偏移量为0。
+
+## example
+    1. 展示默认 db 的所有Stream Load任务
+        SHOW STREAM LOAD;
+    
+    2. 展示指定 db 的Stream Load任务,label 中包含字符串 "2014_01_02",展示最老的10个
+        SHOW STREAM LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;
+        
+    3. 展示指定 db 的Stream Load任务,指定 label 为 "load_example_db_20140102"
+        SHOW STREAM LOAD FROM example_db WHERE LABEL = "load_example_db_20140102";
+        
+    4. 展示指定 db 的Stream Load任务,指定 status 为 "success", 并按 LoadStartTime 降序排序
+        SHOW STREAM LOAD FROM example_db WHERE STATUS = "success" ORDER BY LoadStartTime DESC;
+        
+    5. 展示指定 db 的导入任务 并按 LoadStartTime 降序排序,并从偏移量5开始显示10条查询结果
+        SHOW STREAM LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10;
+        SHOW STREAM LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;
+
+## keyword
+    SHOW,STREAM LOAD
+
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index bb2801e..ff76913 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -257,7 +257,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A
     KW_REPAIR, KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE, KW_REPLACE_IF_NOT_NULL, KW_REPLICA, KW_RESOURCE, KW_RESOURCES, KW_RESTORE, KW_RETURNS, KW_RESUME, KW_REVOKE,
     KW_RIGHT, KW_ROLE, KW_ROLES, KW_ROLLBACK, KW_ROLLUP, KW_ROUTINE, KW_ROW, KW_ROWS,
     KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SET_VAR, KW_SHOW, KW_SIGNED,
-    KW_SMALLINT, KW_SNAPSHOT, KW_SONAME, KW_SPLIT, KW_START, KW_STATUS, KW_STOP, KW_STORAGE, KW_STRING,
+    KW_SMALLINT, KW_SNAPSHOT, KW_SONAME, KW_SPLIT, KW_START, KW_STATUS, KW_STOP, KW_STORAGE, KW_STREAM, KW_STRING,
     KW_SUM, KW_SUPERUSER, KW_SYNC, KW_SYSTEM,
     KW_TABLE, KW_TABLES, KW_TABLET, KW_TASK, KW_TEMPORARY, KW_TERMINATED, KW_THAN, KW_TIME, KW_THEN, KW_TIMESTAMP, KW_TINYINT,
     KW_TO, KW_TRANSACTION, KW_TRIGGERS, KW_TRIM, KW_TRUE, KW_TRUNCATE, KW_TYPE, KW_TYPES,
@@ -2446,6 +2446,11 @@ show_param ::=
     {:
         RESULT = new ShowLoadStmt(db, parser.where, orderByClause, limitClause);
     :}
+    /* Show stream load statement */
+    | KW_STREAM KW_LOAD opt_db:db opt_wild_where order_by_clause:orderByClause limit_clause:limitClause
+    {:
+        RESULT = new ShowStreamLoadStmt(db, parser.where, orderByClause, limitClause);
+    :}
     /* Show export statement */
     | KW_EXPORT opt_db:db opt_wild_where order_by_clause:orderByClause limit_clause:limitClause
     {:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowStreamLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowStreamLoadStmt.java
new file mode 100644
index 0000000..2afba94
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowStreamLoadStmt.java
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import org.apache.doris.analysis.BinaryPredicate.Operator;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.OrderByPair;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+;
+
+// SHOW STREAM LOAD STATUS statement used to get record of stream load job.
+//
+// syntax:
+//      SHOW STREAM LOAD [FROM db] [LIKE mask]
+public class ShowStreamLoadStmt extends ShowStmt {
+    private static final Logger LOG = LogManager.getLogger(ShowStreamLoadStmt.class);
+
+    public enum StreamLoadState {
+        SUCCESS,
+        FAIL
+    }
+
+    private String dbName;
+    private Expr whereClause;
+    private LimitElement limitElement;
+    private List<OrderByElement> orderByElements;
+
+    private String labelValue;
+    private String stateValue;
+    private boolean isAccurateMatch;
+
+    private ArrayList<OrderByPair> orderByPairs;
+
+    private ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
+            .add("Label").add("Db").add("Table").add("User")
+            .add("ClientIp").add("Status").add("Message").add("Url").add("TotalRows")
+            .add("LoadedRows").add("FilteredRows").add("UnselectedRows").add("LoadBytes")
+            .add("StartTime").add("FinishTime")
+            .build();
+
+    public ShowStreamLoadStmt(String db, Expr labelExpr, List<OrderByElement> orderByElements, LimitElement limitElement) {
+        this.dbName = db;
+        this.whereClause = labelExpr;
+        this.orderByElements = orderByElements;
+        this.limitElement = limitElement;
+
+        this.labelValue = null;
+        this.stateValue = null;
+        this.isAccurateMatch = false;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public ArrayList<OrderByPair> getOrderByPairs() {
+        return this.orderByPairs;
+    }
+
+    public ArrayList<OrderByPair> getOrderByFinishTime() {
+        ArrayList<OrderByPair> orderByFinishTime = new ArrayList<OrderByPair>();
+        int index = 0;
+        try {
+            index = analyzeColumn("FinishTime");
+        } catch (AnalysisException e) {
+        }
+        OrderByPair orderByPair = new OrderByPair(index, false);
+        orderByFinishTime.add(orderByPair);
+        return orderByFinishTime;
+    }
+
+    public long getLimit() {
+        if (limitElement != null && limitElement.hasLimit()) {
+            return limitElement.getLimit();
+        }
+        return -1L;
+    }
+
+    public long getOffset() {
+        if (limitElement != null && limitElement.hasOffset()) {
+            return limitElement.getOffset();
+        }
+        return -1L;
+    }
+
+    public String getLabelValue() {
+        return this.labelValue;
+    }
+
+    public StreamLoadState getState() {
+        if (Strings.isNullOrEmpty(stateValue)) {
+            return null;
+        }
+
+        StreamLoadState state = null;
+        try {
+            state = StreamLoadState.valueOf(stateValue);
+        } catch (Exception e) {
+        }
+        return state;
+    }
+
+    public boolean isAccurateMatch() {
+        return isAccurateMatch;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        super.analyze(analyzer);
+        if (Strings.isNullOrEmpty(dbName)) {
+            dbName = analyzer.getDefaultDb();
+            if (Strings.isNullOrEmpty(dbName)) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+            }
+        } else {
+            dbName = ClusterNamespace.getFullName(getClusterName(), dbName);
+        }
+
+        // analyze where clause if not null
+        if (whereClause != null) {
+            if (whereClause instanceof CompoundPredicate) {
+                CompoundPredicate cp = (CompoundPredicate) whereClause;
+                if (cp.getOp() != CompoundPredicate.Operator.AND) {
+                    throw new AnalysisException("Only allow compound predicate with operator AND");
+                }
+
+                analyzeSubPredicate(cp.getChild(0));
+                analyzeSubPredicate(cp.getChild(1));
+            } else {
+                analyzeSubPredicate(whereClause);
+            }
+        }
+
+        // order by
+        if (orderByElements != null && !orderByElements.isEmpty()) {
+            orderByPairs = new ArrayList<OrderByPair>();
+            for (OrderByElement orderByElement : orderByElements) {
+                if (!(orderByElement.getExpr() instanceof SlotRef)) {
+                    throw new AnalysisException("Should order by column");
+                }
+                SlotRef slotRef = (SlotRef) orderByElement.getExpr();
+                int index = analyzeColumn(slotRef.getColumnName());
+                OrderByPair orderByPair = new OrderByPair(index, !orderByElement.getIsAsc());
+                orderByPairs.add(orderByPair);
+            }
+        }
+    }
+
+    private void analyzeSubPredicate(Expr subExpr) throws AnalysisException {
+        if (subExpr == null) {
+            return;
+        }
+
+        boolean valid = true;
+        boolean hasLabel = false;
+        boolean hasState = false;
+
+        CHECK: {
+            if (subExpr instanceof BinaryPredicate) {
+                BinaryPredicate binaryPredicate = (BinaryPredicate) subExpr;
+                if (binaryPredicate.getOp() != Operator.EQ) {
+                    valid = false;
+                    break CHECK;
+                }
+            } else if (subExpr instanceof LikePredicate) {
+                LikePredicate likePredicate = (LikePredicate) subExpr;
+                if (likePredicate.getOp() != LikePredicate.Operator.LIKE) {
+                    valid = false;
+                    break CHECK;
+                }
+            } else {
+                valid = false;
+                break CHECK;
+            }
+
+            // left child
+            if (!(subExpr.getChild(0) instanceof SlotRef)) {
+                valid = false;
+                break CHECK;
+            }
+            String leftKey = ((SlotRef) subExpr.getChild(0)).getColumnName();
+            if (leftKey.equalsIgnoreCase("label")) {
+                hasLabel = true;
+            } else if (leftKey.equalsIgnoreCase("status")) {
+                hasState = true;
+            } else {
+                valid = false;
+                break CHECK;
+            }
+
+            if (hasState && !(subExpr instanceof BinaryPredicate)) {
+                valid = false;
+                break CHECK;
+            }
+
+            if (hasLabel && subExpr instanceof BinaryPredicate) {
+                isAccurateMatch = true;
+            }
+
+            // right child
+            if (!(subExpr.getChild(1) instanceof StringLiteral)) {
+                valid = false;
+                break CHECK;
+            }
+
+            String value = ((StringLiteral) subExpr.getChild(1)).getStringValue();
+            if (Strings.isNullOrEmpty(value)) {
+                valid = false;
+                break CHECK;
+            }
+
+            if (hasLabel) {
+                labelValue = value;
+            } else if (hasState) {
+                stateValue = value.toUpperCase();
+
+                try {
+                    StreamLoadState.valueOf(stateValue);
+                } catch (Exception e) {
+                    valid = false;
+                    break CHECK;
+                }
+            }
+        }
+
+        if (!valid) {
+            throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\","
+                    + " or LABEL LIKE \"matcher\", " + " or STATUS = \"SUCCESS|FAIL\", "
+                    + " or compound predicate with operator AND");
+        }
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("SHOW STREAM LOAD ");
+        if (!Strings.isNullOrEmpty(dbName)) {
+            sb.append("FROM `").append(dbName).append("`");
+        }
+
+        if (whereClause != null) {
+            sb.append(" WHERE ").append(whereClause.toSql());
+        }
+
+        // Order By clause
+        if (orderByElements != null) {
+            sb.append(" ORDER BY ");
+            for (int i = 0; i < orderByElements.size(); ++i) {
+                sb.append(orderByElements.get(i).getExpr().toSql());
+                sb.append((orderByElements.get(i).getIsAsc()) ? " ASC" : " DESC");
+                sb.append((i + 1 != orderByElements.size()) ? ", " : "");
+            }
+        }
+
+        if (getLimit() != -1L) {
+            sb.append(" LIMIT ").append(getLimit());
+        }
+
+        if (getOffset() != -1L) {
+            sb.append(" OFFSET ").append(getOffset());
+        }
+
+        return sb.toString();
+    }
+
+    @Override
+    public String toString() {
+        return toSql();
+    }
+
+    @Override
+    public ShowResultSetMetaData getMetaData() {
+        ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
+        for (String title : TITLE_NAMES) {
+            builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
+        }
+        return builder.build();
+    }
+
+    @Override
+    public RedirectStatus getRedirectStatus() {
+        return RedirectStatus.FORWARD_NO_SYNC;
+    }
+
+    private int analyzeColumn(String columnName) throws AnalysisException {
+        for (String title : TITLE_NAMES) {
+            if (title.equalsIgnoreCase(columnName)) {
+                return TITLE_NAMES.indexOf(title);
+            }
+        }
+
+        throw new AnalysisException("Title name[" + columnName + "] does not exist");
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 4a14c20..9d177e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -600,6 +600,18 @@ public class Config extends ConfigBase {
     public static int fetch_stream_load_record_interval_second = 120;
 
     /**
+     * Default max number of recent stream load record that can be stored in memory.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int max_stream_load_record_size = 5000;
+
+    /**
+     * Whether to disable show stream load and clear stream load records in memory.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean disable_show_stream_load = false;
+
+    /**
      * maximum concurrent running txn num including prepare, commit txns under a single db
      * txn manager will reject coming txns
      */
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecord.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecord.java
new file mode 100644
index 0000000..3c4bf8d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecord.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+public class StreamLoadRecord {
+    private static final Logger LOG = LogManager.getLogger(StreamLoadRecord.class);
+
+    private String label;
+    private String db;
+    private String table;
+    private String user;
+    private String clientIp;
+    private String status;
+    private String message;
+    private String url;
+    private String totalRows;
+    private String loadedRows;
+    private String filteredRows;
+    private String unselectedRows;
+    private String loadBytes;
+    private String startTime;
+    private String finishTime;
+
+
+    public StreamLoadRecord(String label, String db, String table, String user, String clientIp, String status, String message, String url, String totalRows, String loadedRows, String filteredRows, String unselectedRows, String loadBytes, String startTime, String finishTime) {
+        this.label = label;
+        this.db = db;
+        this.table = table;
+        this.user = user;
+        this.clientIp = clientIp;
+        this.status = status;
+        this.message = message;
+        this.url = url;
+        this.totalRows = totalRows;
+        this.loadedRows = loadedRows;
+        this.filteredRows = filteredRows;
+        this.unselectedRows = unselectedRows;
+        this.loadBytes = loadBytes;
+        this.startTime = startTime;
+        this.finishTime = finishTime;
+    }
+
+    public List<Comparable> getStreamLoadInfo() {
+        List<Comparable> streamLoadInfo = Lists.newArrayList();
+        streamLoadInfo.add(this.label);
+        streamLoadInfo.add(this.db);
+        streamLoadInfo.add(this.table);
+        streamLoadInfo.add(this.user);
+        streamLoadInfo.add(this.clientIp);
+        streamLoadInfo.add(this.status);
+        streamLoadInfo.add(this.message);
+        streamLoadInfo.add(this.url);
+        streamLoadInfo.add(this.totalRows);
+        streamLoadInfo.add(this.loadedRows);
+        streamLoadInfo.add(this.filteredRows);
+        streamLoadInfo.add(this.unselectedRows);
+        streamLoadInfo.add(this.loadBytes);
+        streamLoadInfo.add(this.startTime);
+        streamLoadInfo.add(this.finishTime);
+        return streamLoadInfo;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public String getFinishTime() {
+        return this.finishTime;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
index b137566..9623a0e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java
@@ -17,8 +17,15 @@
 
 package org.apache.doris.load;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.doris.analysis.ShowStreamLoadStmt.StreamLoadState;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.TimeUtils;
@@ -26,6 +33,7 @@ import org.apache.doris.plugin.AuditEvent;
 import org.apache.doris.plugin.AuditEvent.EventType;
 import org.apache.doris.plugin.StreamLoadAuditEvent;
 import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStreamLoadRecord;
@@ -40,15 +48,168 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.stream.Collectors;
 
 public class StreamLoadRecordMgr extends MasterDaemon {
     private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);
 
+    private class StreamLoadItem {
+        private String label;
+        private long dbId;
+        private String finishTime;
+
+        public StreamLoadItem(String label, long dbId, String finishTime) {
+            this.label = label;
+            this.dbId = dbId;
+            this.finishTime = finishTime;
+        }
+
+        public String getLabel() {
+            return label;
+        }
+
+        public long getDbId() {
+            return dbId;
+        }
+
+        public String getFinishTime() {
+            return finishTime;
+        }
+    }
+
+    class StreamLoadComparator implements Comparator<StreamLoadItem> {
+        public int compare(StreamLoadItem s1, StreamLoadItem s2) {
+            return s1.getFinishTime().compareTo(s2.getFinishTime());
+        }
+    }
+
+    Queue<StreamLoadItem> streamLoadRecordHeap = new PriorityQueue<>(new StreamLoadComparator());
+    private Map<Long, Map<String, StreamLoadRecord>> dbIdToLabelToStreamLoadRecord = Maps.newConcurrentMap();
+    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+
     public StreamLoadRecordMgr(String name, long intervalMs) {
             super(name, intervalMs);
     }
 
+    public void addStreamLoadRecord(long dbId, String label, StreamLoadRecord streamLoadRecord) {
+        writeLock();
+        while (isQueueFull()) {
+            StreamLoadItem record = streamLoadRecordHeap.poll();
+            if (record != null) {
+                String de_label = record.getLabel();
+                long de_dbId = record.getDbId();
+
+                Map<String, StreamLoadRecord> labelToStreamLoadRecord = dbIdToLabelToStreamLoadRecord.get(de_dbId);
+                Iterator<Map.Entry<String, StreamLoadRecord>> iter_record = labelToStreamLoadRecord.entrySet().iterator();
+                while (iter_record.hasNext()) {
+                    String labelInMap = iter_record.next().getKey();
+                    if (labelInMap.equals(de_label)) {
+                        iter_record.remove();
+                        break;
+                    }
+                }
+            }
+        }
+
+        StreamLoadItem record = new StreamLoadItem(label, dbId, streamLoadRecord.getFinishTime());
+        streamLoadRecordHeap.offer(record);
+
+        if (!dbIdToLabelToStreamLoadRecord.containsKey(dbId)) {
+            dbIdToLabelToStreamLoadRecord.put(dbId, new ConcurrentHashMap<>());
+        }
+        Map<String, StreamLoadRecord> labelToStreamLoadRecord = dbIdToLabelToStreamLoadRecord.get(dbId);
+        if (!labelToStreamLoadRecord.containsKey(label)) {
+            labelToStreamLoadRecord.put(label, streamLoadRecord);
+        }
+        writeUnlock();
+    }
+
+    public List<List<Comparable>> getStreamLoadRecordByDb(long dbId, String label, boolean accurateMatch, StreamLoadState state) {
+        LinkedList<List<Comparable>> streamLoadRecords = new LinkedList<List<Comparable>>();
+
+        readLock();
+        try {
+            if (!dbIdToLabelToStreamLoadRecord.containsKey(dbId)) {
+                return streamLoadRecords;
+            }
+
+            List<StreamLoadRecord> streamLoadRecordList = Lists.newArrayList();
+            Map<String, StreamLoadRecord> labelToStreamLoadRecord = dbIdToLabelToStreamLoadRecord.get(dbId);
+            if (Strings.isNullOrEmpty(label)) {
+                streamLoadRecordList.addAll(labelToStreamLoadRecord.values().stream().collect(Collectors.toList()));
+            } else {
+                // check label value
+                if (accurateMatch) {
+                    if (!labelToStreamLoadRecord.containsKey(label)) {
+                        return streamLoadRecords;
+                    }
+                    streamLoadRecordList.add(labelToStreamLoadRecord.get(label));
+                } else {
+                    // non-accurate match
+                    for (Map.Entry<String, StreamLoadRecord> entry : labelToStreamLoadRecord.entrySet()) {
+                        if (entry.getKey().contains(label)) {
+                            streamLoadRecordList.add(entry.getValue());
+                        }
+                    }
+                }
+            }
+
+            for (StreamLoadRecord streamLoadRecord : streamLoadRecordList) {
+                try {
+                    if (state != null && !String.valueOf(state).equalsIgnoreCase(streamLoadRecord.getStatus())) {
+                        continue;
+                    }
+                    streamLoadRecords.add(streamLoadRecord.getStreamLoadInfo());
+                } catch (Exception e) {
+                    continue;
+                }
+
+            }
+            return streamLoadRecords;
+        } finally {
+            readUnlock();
+        }
+    }
+
+    public void clearStreamLoadRecord() {
+        writeLock();
+        if (streamLoadRecordHeap.size() > 0 || dbIdToLabelToStreamLoadRecord.size() > 0) {
+            streamLoadRecordHeap.clear();
+            dbIdToLabelToStreamLoadRecord.clear();
+        }
+        writeUnlock();
+    }
+
+    public boolean isQueueFull() {
+        return streamLoadRecordHeap.size() >= Config.max_stream_load_record_size;
+    }
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
     @Override
     protected void runAfterCatalogReady() {
         ImmutableMap<Long, Backend> backends = Catalog.getCurrentSystemInfo().getIdToBackend();
@@ -88,7 +249,34 @@ public class StreamLoadRecordMgr extends MasterDaemon {
                     if (entry.getValue().getFinishTime() > lastStreamLoadTime) {
                         lastStreamLoadTime = entry.getValue().getFinishTime();
                     }
+
+                    if (Config.disable_show_stream_load) {
+                        continue;
+                    }
+                    StreamLoadRecord streamLoadRecord = new StreamLoadRecord(streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(),
+                            streamLoadItem.getUser(), streamLoadItem.getUserIp(), streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(),
+                            String.valueOf(streamLoadItem.getTotalRows()), String.valueOf(streamLoadItem.getLoadedRows()),
+                            String.valueOf(streamLoadItem.getFilteredRows()), String.valueOf(streamLoadItem.getUnselectedRows()),
+                            String.valueOf(streamLoadItem.getLoadBytes()), startTime, finishTime);
+
+                    String cluster = streamLoadItem.getCluster();
+                    if (Strings.isNullOrEmpty(cluster)) {
+                        cluster = SystemInfoService.DEFAULT_CLUSTER;
+                    }
+
+                    String fullDbName = ClusterNamespace.getFullName(cluster, streamLoadItem.getDb());
+                    Database db = Catalog.getCurrentCatalog().getDb(fullDbName);
+                    if (db == null) {
+                        String dbName = fullDbName;
+                        if (Strings.isNullOrEmpty(streamLoadItem.getCluster())) {
+                            dbName = streamLoadItem.getDb();
+                        }
+                        throw new UserException("unknown database, database=" + dbName);
+                    }
+                    long dbId = db.getId();
+                    Catalog.getCurrentCatalog().getStreamLoadRecordMgr().addStreamLoadRecord(dbId, streamLoadItem.getLabel(), streamLoadRecord);
                 }
+
                 if (streamLoadRecordBatch.size() > 0) {
                     backend.setLastStreamLoadTime(lastStreamLoadTime);
                     beIdToLastStreamLoad.put(backend.getId(), lastStreamLoadTime);
@@ -113,6 +301,10 @@ public class StreamLoadRecordMgr extends MasterDaemon {
             FetchStreamLoadRecord fetchStreamLoadRecord = new FetchStreamLoadRecord(beIdToLastStreamLoad);
             Catalog.getCurrentCatalog().getEditLog().logFetchStreamLoadRecord(fetchStreamLoadRecord);
         }
+
+        if (Config.disable_show_stream_load) {
+            Catalog.getCurrentCatalog().getStreamLoadRecordMgr().clearStreamLoadRecord();
+        }
     }
 
     public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index cc07ca5..ec42fc4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -62,6 +62,7 @@ import org.apache.doris.analysis.ShowRoutineLoadTaskStmt;
 import org.apache.doris.analysis.ShowSmallFilesStmt;
 import org.apache.doris.analysis.ShowSnapshotStmt;
 import org.apache.doris.analysis.ShowStmt;
+import org.apache.doris.analysis.ShowStreamLoadStmt;
 import org.apache.doris.analysis.ShowTableStatusStmt;
 import org.apache.doris.analysis.ShowTableStmt;
 import org.apache.doris.analysis.ShowTabletStmt;
@@ -209,6 +210,8 @@ public class ShowExecutor {
             handleShowColumn();
         } else if (stmt instanceof ShowLoadStmt) {
             handleShowLoad();
+        } else if (stmt instanceof ShowStreamLoadStmt) {
+                     handleShowStreamLoad();
         } else if (stmt instanceof ShowLoadWarningsStmt) {
             handleShowLoadWarnings();
         } else if (stmt instanceof ShowRoutineLoadStmt) {
@@ -843,6 +846,59 @@ public class ShowExecutor {
         resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
     }
 
+    // Show stream load statement.
+    private void handleShowStreamLoad() throws AnalysisException {
+        ShowStreamLoadStmt showStmt = (ShowStreamLoadStmt) stmt;
+
+        Catalog catalog = Catalog.getCurrentCatalog();
+        Database db = catalog.getDb(showStmt.getDbName());
+        if (db == null) {
+            ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName());
+        }
+        long dbId = db.getId();
+
+        List<List<Comparable>> streamLoadRecords = catalog.getStreamLoadRecordMgr().getStreamLoadRecordByDb(dbId, showStmt.getLabelValue(), showStmt.isAccurateMatch(), showStmt.getState());
+
+        // order the result of List<StreamLoadRecord> by orderByPairs in show stmt
+        List<OrderByPair> orderByPairs = showStmt.getOrderByPairs();
+        if (orderByPairs == null) {
+            orderByPairs = showStmt.getOrderByFinishTime();
+        }
+        ListComparator<List<Comparable>> comparator = null;
+        if (orderByPairs != null) {
+            OrderByPair[] orderByPairArr = new OrderByPair[orderByPairs.size()];
+            comparator = new ListComparator<List<Comparable>>(orderByPairs.toArray(orderByPairArr));
+        } else {
+            // sort by id asc
+            comparator = new ListComparator<List<Comparable>>(0);
+        }
+        Collections.sort(streamLoadRecords, comparator);
+
+        List<List<String>> rows = Lists.newArrayList();
+        for (List<Comparable> streamLoadRecord : streamLoadRecords) {
+            List<String> oneInfo = new ArrayList<String>(streamLoadRecord.size());
+
+            for (Comparable element : streamLoadRecord) {
+                oneInfo.add(element.toString());
+            }
+            rows.add(oneInfo);
+        }
+
+        // filter by limit
+        long limit = showStmt.getLimit();
+        long offset = showStmt.getOffset() == -1L ? 0 : showStmt.getOffset();
+        if (offset >= rows.size()) {
+            rows = Lists.newArrayList();
+        } else if (limit != -1L) {
+            if ((limit + offset) < rows.size()) {
+                rows = rows.subList((int) offset, (int) (limit + offset));
+            } else {
+                rows = rows.subList((int) offset, rows.size());
+            }
+        }
+        resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
+    }
+
     private void handleShowLoadWarnings() throws AnalysisException {
         ShowLoadWarningsStmt showWarningsStmt = (ShowLoadWarningsStmt) stmt;
 
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex
index e4a1e45..65c7789 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -344,6 +344,7 @@ import org.apache.doris.qe.SqlModeHelper;
         keywordMap.put("status", new Integer(SqlParserSymbols.KW_STATUS));
         keywordMap.put("stop", new Integer(SqlParserSymbols.KW_STOP));
         keywordMap.put("storage", new Integer(SqlParserSymbols.KW_STORAGE));
+        keywordMap.put("stream", new Integer(SqlParserSymbols.KW_STREAM));
         keywordMap.put("string", new Integer(SqlParserSymbols.KW_STRING));
         keywordMap.put("sum", new Integer(SqlParserSymbols.KW_SUM));
         keywordMap.put("superuser", new Integer(SqlParserSymbols.KW_SUPERUSER));

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org