You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/22 15:42:54 UTC
[doris] branch master updated: [fix](partition-cache) fix result may not write when enable partition cache (#10319)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a2b5020375 [fix](partition-cache) fix result may not write when enable partition cache (#10319)
a2b5020375 is described below
commit a2b50203750e6ed375cf525a2531916ad5040499
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Wed Jun 22 23:42:46 2022 +0800
[fix](partition-cache) fix result may not write when enable partition cache (#10319)
fix result may not write when enable partition cache
---
.../java/org/apache/doris/qe/ConnectProcessor.java | 2 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 74 +++++++-------------
.../org/apache/doris/qe/cache/CacheAnalyzer.java | 6 ++
.../data/query/cache/partition_cache.out | 14 ++++
.../suites/query/cache/partition_cache.groovy | 80 ++++++++++++++++++++++
5 files changed, 126 insertions(+), 50 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 391ebf0164..512b8c61f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -390,7 +390,7 @@ public class ConnectProcessor {
if (resultSet == null) {
packet = executor.getOutputPacket();
} else {
- executor.sendResult(resultSet);
+ executor.sendResultSet(resultSet);
packet = getResultPacket();
if (packet == null) {
LOG.debug("packet == null");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index b93573689d..0fe91b9dea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -858,9 +858,8 @@ public class StmtExecutor implements ProfileWriter {
/**
* Handle the SelectStmt via Cache.
*/
- private void handleCacheStmt(CacheAnalyzer cacheAnalyzer,
- MysqlChannel channel, SelectStmt selectStmt) throws Exception {
- RowBatch batch = null;
+ private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel, SelectStmt selectStmt)
+ throws Exception {
InternalService.PFetchCacheResult cacheResult = cacheAnalyzer.getCacheData();
CacheMode mode = cacheAnalyzer.getCacheMode();
SelectStmt newSelectStmt = selectStmt;
@@ -885,44 +884,7 @@ public class StmtExecutor implements ProfileWriter {
planner.plan(newSelectStmt, analyzer, context.getSessionVariable().toThrift());
}
}
-
- coord = new Coordinator(context, analyzer, planner);
- QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
- new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
- coord.exec();
-
- while (true) {
- batch = coord.getNext();
- if (batch.getBatch() != null) {
- cacheAnalyzer.copyRowBatch(batch);
- if (!isSendFields) {
- sendFields(newSelectStmt.getColLabels(), exprToType(newSelectStmt.getResultExprs()));
- isSendFields = true;
- }
- for (ByteBuffer row : batch.getBatch().getRows()) {
- channel.sendOnePacket(row);
- }
- context.updateReturnRows(batch.getBatch().getRows().size());
- }
- if (batch.isEos()) {
- break;
- }
- }
-
- if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
- isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), newSelectStmt, isSendFields, false);
- }
-
- cacheAnalyzer.updateCache();
-
- if (!isSendFields) {
- sendFields(newSelectStmt.getColLabels(), exprToType(newSelectStmt.getResultExprs()));
- isSendFields = true;
- }
-
- statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder();
- context.getState().setEof();
- return;
+ sendResult(false, isSendFields, newSelectStmt, channel, cacheAnalyzer, cacheResult);
}
private boolean handleSelectRequestInFe(SelectStmt parsedSelectStmt) throws IOException {
@@ -944,7 +906,7 @@ public class StmtExecutor implements ProfileWriter {
}
}
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
- sendResult(resultSet);
+ sendResultSet(resultSet);
return true;
}
@@ -978,7 +940,7 @@ public class StmtExecutor implements ProfileWriter {
return;
}
- RowBatch batch;
+
MysqlChannel channel = context.getMysqlChannel();
boolean isOutfileQuery = queryStmt.hasOutFileClause();
@@ -988,8 +950,12 @@ public class StmtExecutor implements ProfileWriter {
handleCacheStmt(cacheAnalyzer, channel, (SelectStmt) queryStmt);
return;
}
+ sendResult(isOutfileQuery, false, queryStmt, channel, null, null);
+ }
+
- // send result
+ private void sendResult(boolean isOutfileQuery, boolean isSendFields, QueryStmt queryStmt, MysqlChannel channel,
+ CacheAnalyzer cacheAnalyzer, InternalService.PFetchCacheResult cacheResult) throws Exception {
// 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx,
// We will not send real query result to client. Instead, we only send OK to client with
// number of rows selected. For example:
@@ -997,7 +963,7 @@ public class StmtExecutor implements ProfileWriter {
// Query OK, 10 rows affected (0.01 sec)
//
// 2. If this is a query, send the result expr fields first, and send result data back to client.
- boolean isSendFields = false;
+ RowBatch batch;
coord = new Coordinator(context, analyzer, planner);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
@@ -1009,6 +975,9 @@ public class StmtExecutor implements ProfileWriter {
batch = coord.getNext();
// for outfile query, there will be only one empty batch send back with eos flag
if (batch.getBatch() != null) {
+ if (cacheAnalyzer != null) {
+ cacheAnalyzer.copyRowBatch(batch);
+ }
// For some language driver, getting error packet after fields packet
// will be recognized as a success result
// so We need to send fields after first batch arrived
@@ -1029,6 +998,14 @@ public class StmtExecutor implements ProfileWriter {
break;
}
}
+ if (cacheAnalyzer != null) {
+ if (cacheResult != null && cacheAnalyzer.getHitRange() == Cache.HitRange.Right) {
+ isSendFields = sendCachedValues(channel, cacheResult.getValuesList(), (SelectStmt) queryStmt,
+ isSendFields, false);
+ }
+
+ cacheAnalyzer.updateCache();
+ }
if (!isSendFields) {
if (!isOutfileQuery) {
sendFields(queryStmt.getColLabels(), exprToType(queryStmt.getResultExprs()));
@@ -1042,7 +1019,6 @@ public class StmtExecutor implements ProfileWriter {
plannerProfile.setQueryFetchResultFinishTime();
}
-
private TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest request) throws Exception {
TWaitingTxnStatusResult statusResult = null;
if (Catalog.getCurrentCatalog().isMaster()) {
@@ -1495,7 +1471,7 @@ public class StmtExecutor implements ProfileWriter {
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}
- public void sendResult(ResultSet resultSet) throws IOException {
+ public void sendResultSet(ResultSet resultSet) throws IOException {
context.updateReturnRows(resultSet.getResultRows().size());
// Send meta data.
sendMetaData(resultSet.getMetaData());
@@ -1529,7 +1505,7 @@ public class StmtExecutor implements ProfileWriter {
return;
}
- sendResult(resultSet);
+ sendResultSet(resultSet);
}
private void handleUnlockTablesStmt() {
@@ -1595,7 +1571,7 @@ public class StmtExecutor implements ProfileWriter {
// create table
DdlExecutor.execute(context.getCatalog(), ctasStmt);
context.getState().setOk();
- } catch (Exception e) {
+ } catch (Exception e) {
// Maybe our bug
LOG.warn("CTAS create table error, stmt={}", originStmt.originStmt, e);
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
index 51ae3e2977..380e515bef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
@@ -208,6 +208,12 @@ public class CacheAnalyzer {
LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
+ if (enablePartitionCache() && ((OlapScanNode) node).getSelectedPartitionNum() > 1
+ && selectStmt.hasGroupByClause()) {
+ LOG.debug("more than one partition scanned when qeury has agg, partition cache cannot use, queryid {}",
+ DebugUtil.printId(queryId));
+ return CacheMode.None;
+ }
CacheTable cTable = getSelectedPartitionLastUpdateTime((OlapScanNode) node);
tblTimeList.add(cTable);
}
diff --git a/regression-test/data/query/cache/partition_cache.out b/regression-test/data/query/cache/partition_cache.out
new file mode 100644
index 0000000000..04875e2448
--- /dev/null
+++ b/regression-test/data/query/cache/partition_cache.out
@@ -0,0 +1,14 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !partition_cache --
+2022-05-28 0
+2022-05-29 0
+2022-05-30 0
+2022-06-01 0
+2022-06-02 0
+
+-- !partition_cache --
+2022-05-28 0
+2022-05-29 0
+2022-05-30 0
+2022-06-01 0
+2022-06-02 0
diff --git a/regression-test/suites/query/cache/partition_cache.groovy b/regression-test/suites/query/cache/partition_cache.groovy
new file mode 100644
index 0000000000..206708a2a2
--- /dev/null
+++ b/regression-test/suites/query/cache/partition_cache.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
+// and modified by Doris.
+
+suite("partition_cache") {
+ def tableName = "test_partition_cache"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` date NOT NULL COMMENT "",
+ `k2` int(11) NOT NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT "OLAP"
+ PARTITION BY RANGE(`k1`)
+ (PARTITION p202205 VALUES [('2022-05-01'), ('2022-06-01')),
+ PARTITION p202206 VALUES [('2022-06-01'), ('2022-07-01')))
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 32
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "in_memory" = "false",
+ "storage_format" = "V2"
+ )
+ """
+ sql """ INSERT INTO ${tableName} VALUES
+ ("2022-05-27",0),
+ ("2022-05-28",0),
+ ("2022-05-29",0),
+ ("2022-05-30",0),
+ ("2022-06-01",0),
+ ("2022-06-02",0)
+ """
+ sql " set enable_partition_cache=true "
+
+ qt_partition_cache """
+ select
+ k1,
+ sum(k2) as total_pv
+ from
+ ${tableName}
+ where
+ k1 between '2022-05-28' and '2022-06-30'
+ group by
+ k1
+ order by
+ k1;
+ """
+ qt_partition_cache """
+ select
+ k1,
+ sum(k2) as total_pv
+ from
+ ${tableName}
+ where
+ k1 between '2022-05-28' and '2022-06-30'
+ group by
+ k1
+ order by
+ k1;
+ """
+ sql " set enable_partition_cache=false "
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org