You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/22 15:47:25 UTC

[doris] branch dev-1.0.1 updated: [fix](partition-cache) fix result may not write when enable partition cache (#10319)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.0.1 by this push:
     new cc7da636db [fix](partition-cache) fix result may not write when enable partition cache (#10319)
cc7da636db is described below

commit cc7da636db0a680728ff7e951dff7cfe03c060f2
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed Jun 22 23:42:46 2022 +0800

    [fix](partition-cache) fix result may not write when enable partition cache (#10319)
    
    fix result may not write when enable partition cache
---
 .../java/org/apache/doris/qe/ConnectProcessor.java |  2 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     | 76 ++++++++------------
 .../org/apache/doris/qe/cache/CacheAnalyzer.java   |  6 ++
 .../data/query/cache/partition_cache.out           | 14 ++++
 .../suites/query/cache/partition_cache.groovy      | 80 ++++++++++++++++++++++
 5 files changed, 128 insertions(+), 50 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 87f2f9f294..a0da3f94f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -386,7 +386,7 @@ public class ConnectProcessor {
             if (resultSet == null) {
                 packet = executor.getOutputPacket();
             } else {
-                executor.sendResult(resultSet);
+                executor.sendResultSet(resultSet);
                 packet = getResultPacket();
                 if (packet == null) {
                     LOG.debug("packet == null");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index ec36508e0f..2f1124234d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -837,8 +837,8 @@ public class StmtExecutor implements ProfileWriter {
     /**
      * Handle the SelectStmt via Cache.
      */
-    private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel, SelectStmt selectStmt) throws Exception {
-        RowBatch batch = null;
+    private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel, SelectStmt selectStmt)
+            throws Exception {
         InternalService.PFetchCacheResult cacheResult = cacheAnalyzer.getCacheData();
         CacheMode mode = cacheAnalyzer.getCacheMode();
         SelectStmt newSelectStmt = selectStmt;
@@ -862,44 +862,7 @@ public class StmtExecutor implements ProfileWriter {
                 planner.plan(newSelectStmt, analyzer, context.getSessionVariable().toThrift());
             }
         }
-
-        coord = new Coordinator(context, analyzer, planner);
-        QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
-                new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
-        coord.exec();
-
-        while (true) {
-            batch = coord.getNext();
-            if (batch.getBatch() != null) {
-                cacheAnalyzer.copyRowBatch(batch);
-                if (!isSendFields) {
-                    sendFields(newSelectStmt.getColLabels(), exprToType(newSelectStmt.getResultExprs()));
-                    isSendFields = true;
-                }
-                for (ByteBuffer row : batch.getBatch().getRows()) {
-                    channel.sendOnePacket(row);
-                }
-                context.updateReturnRows(batch.getBatch().getRows().size());
-            }
-            if (batch.isEos()) {
-                break;
-            }
-        }
-
-        if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
-            isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), newSelectStmt, isSendFields, false);
-        }
-
-        cacheAnalyzer.updateCache();
-
-        if (!isSendFields) {
-            sendFields(newSelectStmt.getColLabels(), exprToType(newSelectStmt.getResultExprs()));
-            isSendFields = true;
-        }
-
-        statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
-        context.getState().setEof();
-        return;
+        sendResult(false, isSendFields, newSelectStmt, channel, cacheAnalyzer, cacheResult);
     }
 
     private boolean handleSelectRequestInFe(SelectStmt parsedSelectStmt) throws IOException {
@@ -921,7 +884,7 @@ public class StmtExecutor implements ProfileWriter {
             }
         }
         ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
-        sendResult(resultSet);
+        sendResultSet(resultSet);
         return true;
     }
 
@@ -955,7 +918,7 @@ public class StmtExecutor implements ProfileWriter {
             return;
         }
 
-        RowBatch batch;
+
         MysqlChannel channel = context.getMysqlChannel();
         boolean isOutfileQuery = queryStmt.hasOutFileClause();
 
@@ -965,8 +928,12 @@ public class StmtExecutor implements ProfileWriter {
             handleCacheStmt(cacheAnalyzer, channel, (SelectStmt) queryStmt);
             return;
         }
+        sendResult(isOutfileQuery, false, queryStmt, channel, null, null);
+    }
+
 
-        // send result
+    private void sendResult(boolean isOutfileQuery, boolean isSendFields, QueryStmt queryStmt, MysqlChannel channel,
+            CacheAnalyzer cacheAnalyzer, InternalService.PFetchCacheResult cacheResult) throws Exception {
         // 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx,
         //    We will not send real query result to client. Instead, we only send OK to client with
         //    number of rows selected. For example:
@@ -974,7 +941,7 @@ public class StmtExecutor implements ProfileWriter {
         //          Query OK, 10 rows affected (0.01 sec)
         //
         // 2. If this is a query, send the result expr fields first, and send result data back to client.
-        boolean isSendFields = false;
+        RowBatch batch;
         coord = new Coordinator(context, analyzer, planner);
         QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
                 new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
@@ -986,7 +953,11 @@ public class StmtExecutor implements ProfileWriter {
             batch = coord.getNext();
             // for outfile query, there will be only one empty batch send back with eos flag
             if (batch.getBatch() != null) {
-                // For some language driver, getting error packet after fields packet will be recognized as a success result
+                if (cacheAnalyzer != null) {
+                    cacheAnalyzer.copyRowBatch(batch);
+                }
+                // For some language driver, getting error packet after fields packet
+                // will be recognized as a success result
                 // so We need to send fields after first batch arrived
                 if (!isSendFields) {
                     if (!isOutfileQuery) {
@@ -1005,6 +976,14 @@ public class StmtExecutor implements ProfileWriter {
                 break;
             }
         }
+        if (cacheAnalyzer != null) {
+            if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
+                isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), (SelectStmt) queryStmt,
+                        isSendFields, false);
+            }
+
+            cacheAnalyzer.updateCache();
+        }
         if (!isSendFields) {
             if (!isOutfileQuery) {
                 sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
@@ -1018,7 +997,6 @@ public class StmtExecutor implements ProfileWriter {
         plannerProfile.setQueryFetchResultFinishTime();
     }
 
-
     private TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception {
         TWaitingTxnStatusResult statusResult = null;
         if (Catalog.getCurrentCatalog().isMaster()) {
@@ -1469,7 +1447,7 @@ public class StmtExecutor implements ProfileWriter {
         context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
     }
 
-    public void sendResult(ResultSet resultSet) throws IOException {
+    public void sendResultSet(ResultSet resultSet) throws IOException {
         context.updateReturnRows(resultSet.getResultRows().size());
         // Send meta data.
         sendMetaData(resultSet.getMetaData());
@@ -1503,7 +1481,7 @@ public class StmtExecutor implements ProfileWriter {
             return;
         }
 
-        sendResult(resultSet);
+        sendResultSet(resultSet);
     }
 
     private void handleUnlockTablesStmt() {
@@ -1569,7 +1547,7 @@ public class StmtExecutor implements ProfileWriter {
             // create table
             DdlExecutor.execute(context.getCatalog(), ctasStmt);
             context.getState().setOk();
-        }  catch (Exception e) {
+        } catch (Exception e) {
             // Maybe our bug
             LOG.warn("CTAS create table error, stmt={}", originStmt.originStmt, e);
             context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
index 480650e4e8..a4e4844f0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
@@ -208,6 +208,12 @@ public class CacheAnalyzer {
                 LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId));
                 return CacheMode.None;
             }
+            if (enablePartitionCache() && ((OlapScanNode) node).getSelectedPartitionNum() > 1
+                    && selectStmt.hasGroupByClause()) {
+                LOG.debug("more than one partition scanned when qeury has agg, partition cache cannot use, queryid {}",
+                        DebugUtil.printId(queryId));
+                return CacheMode.None;
+            }
             CacheTable cTable = getSelectedPartitionLastUpdateTime((OlapScanNode) node);
             tblTimeList.add(cTable);
         }
diff --git a/regression-test/data/query/cache/partition_cache.out b/regression-test/data/query/cache/partition_cache.out
new file mode 100644
index 0000000000..04875e2448
--- /dev/null
+++ b/regression-test/data/query/cache/partition_cache.out
@@ -0,0 +1,14 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !partition_cache --
+2022-05-28	0
+2022-05-29	0
+2022-05-30	0
+2022-06-01	0
+2022-06-02	0
+
+-- !partition_cache --
+2022-05-28	0
+2022-05-29	0
+2022-05-30	0
+2022-06-01	0
+2022-06-02	0
diff --git a/regression-test/suites/query/cache/partition_cache.groovy b/regression-test/suites/query/cache/partition_cache.groovy
new file mode 100644
index 0000000000..206708a2a2
--- /dev/null
+++ b/regression-test/suites/query/cache/partition_cache.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
+// and modified by Doris.
+
+suite("partition_cache") {
+    def tableName = "test_partition_cache"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+              `k1` date NOT NULL COMMENT "",
+              `k2` int(11) NOT NULL COMMENT ""
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`, `k2`)
+            COMMENT "OLAP"
+            PARTITION BY RANGE(`k1`)
+            (PARTITION p202205 VALUES [('2022-05-01'), ('2022-06-01')),
+            PARTITION p202206 VALUES [('2022-06-01'), ('2022-07-01')))
+            DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 32
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1",
+            "in_memory" = "false",
+            "storage_format" = "V2"
+            )
+        """
+    sql """ INSERT INTO ${tableName} VALUES 
+                    ("2022-05-27",0),
+                    ("2022-05-28",0),
+                    ("2022-05-29",0),
+                    ("2022-05-30",0),
+                    ("2022-06-01",0),
+                    ("2022-06-02",0)
+        """
+    sql " set enable_partition_cache=true "
+
+    qt_partition_cache """
+                        select
+                          k1,
+                          sum(k2) as total_pv 
+                        from
+                          ${tableName} 
+                        where
+                          k1 between '2022-05-28' and '2022-06-30' 
+                        group by
+                          k1 
+                        order by
+                          k1;
+                       """
+    qt_partition_cache """
+                        select
+                          k1,
+                          sum(k2) as total_pv 
+                        from
+                          ${tableName} 
+                        where
+                          k1 between '2022-05-28' and '2022-06-30' 
+                        group by
+                          k1 
+                        order by
+                          k1;
+                       """
+    sql " set enable_partition_cache=false "
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org