You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2024/01/23 11:26:13 UTC
(doris) branch master updated: [feature](merge-cloud) Support cloud snapshot version (#30232)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2e6367ffaad [feature](merge-cloud) Support cloud snapshot version (#30232)
2e6367ffaad is described below
commit 2e6367ffaad6def9a7a3d4f80a0e7b45f536c5fa
Author: walter <w4...@gmail.com>
AuthorDate: Tue Jan 23 19:26:03 2024 +0800
[feature](merge-cloud) Support cloud snapshot version (#30232)
In cloud mode, meta read lock is not enough to keep a snapshot of
the partition versions. After all scan node are collected, it is
possible to gain a snapshot of the partition version. This PR
batch the requests to obtain a snapshot version and reduce the
query latency.
---
.../main/java/org/apache/doris/common/Config.java | 6 +++
.../java/org/apache/doris/catalog/OlapTable.java | 12 +++++
.../doris/common/profile/SummaryProfile.java | 12 +++--
.../org/apache/doris/planner/OlapScanNode.java | 33 +++++++++++-
.../main/java/org/apache/doris/qe/Coordinator.java | 60 +++++++++++++++++++++
.../suites/query_p0/cross_db/cross_db.groovy | 62 ++++++++++++++++++++++
6 files changed, 180 insertions(+), 5 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a93745b4844..46194c484bd 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2470,6 +2470,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int cloud_cold_read_percent = 10; // 10%
+ // The original meta read lock is not enough to keep a snapshot of partition versions,
+ // so the execution of `createScanRangeLocations` are delayed to `Coordinator::exec`,
+ // to help to acquire a snapshot of partition versions.
+ @ConfField
+ public static boolean enable_cloud_snapshot_version = true;
+
//==========================================================================
// end of cloud config
//==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 1d06b0305a0..937339afb5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -35,6 +35,7 @@ import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.TabletSchedCtx;
import org.apache.doris.clone.TabletScheduler;
+import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -999,6 +1000,17 @@ public class OlapTable extends Table {
//
// ATTN: partitions not belonging to this table will be filtered.
public List<Long> selectNonEmptyPartitionIds(Collection<Long> partitionIds) {
+ if (Config.isCloudMode() && Config.enable_cloud_snapshot_version) {
+ // Assumption: all partitions are CloudPartition.
+ List<CloudPartition> partitions = partitionIds.stream()
+ .map(this::getPartition)
+ .filter(p -> p != null)
+ .filter(p -> p instanceof CloudPartition)
+ .map(p -> (CloudPartition) p)
+ .collect(Collectors.toList());
+ return CloudPartition.selectNonEmptyPartitionIds(partitions);
+ }
+
return partitionIds.stream()
.map(this::getPartition)
.filter(p -> p != null)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 01fc64e0747..90125b079eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -17,6 +17,7 @@
package org.apache.doris.common.profile;
+import org.apache.doris.common.Config;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.thrift.TUnit;
@@ -224,10 +225,13 @@ public class SummaryProfile {
executionSummaryProfile.addInfoString(WRITE_RESULT_TIME,
RuntimeProfile.printCounter(queryWriteResultConsumeTime, TUnit.TIME_MS));
executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME, getPrettyQueryFetchResultFinishTime());
- executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_TIME, getPrettyGetPartitionVersionTime());
- executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_COUNT, getPrettyGetPartitionVersionCount());
- executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT,
- getPrettyGetPartitionVersionByHasDataCount());
+
+ if (Config.isCloudMode()) {
+ executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_TIME, getPrettyGetPartitionVersionTime());
+ executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_COUNT, getPrettyGetPartitionVersionCount());
+ executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT,
+ getPrettyGetPartitionVersionByHasDataCount());
+ }
}
public void setNereidsAnalysisTime() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index e28d9d5c82c..1ccd3fae81e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -722,9 +722,37 @@ public class OlapScanNode extends ScanNode {
}
}
+ // Update the visible version of the scan range locations.
+ public void updateScanRangeVersions(Map<Long, Long> visibleVersionMap) {
+ Map<Long, TScanRangeLocations> locationsMap = scanRangeLocations.stream()
+ .collect(Collectors.toMap(loc -> loc.getScanRange().getPaloScanRange().getTabletId(), loc -> loc));
+ for (Long partitionId : selectedPartitionIds) {
+ final Partition partition = olapTable.getPartition(partitionId);
+ final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
+ final List<Tablet> tablets = selectedTable.getTablets();
+ Long visibleVersion = visibleVersionMap.get(partitionId);
+ assert visibleVersion != null : "the acquried version is not exists in the visible version map";
+ String visibleVersionStr = String.valueOf(visibleVersion);
+ for (Tablet tablet : tablets) {
+ TScanRangeLocations locations = locationsMap.get(tablet.getId());
+ if (locations == null) {
+ continue;
+ }
+ TPaloScanRange scanRange = locations.getScanRange().getPaloScanRange();
+ scanRange.setVersion(visibleVersionStr);
+ }
+ }
+ }
+
private void addScanRangeLocations(Partition partition,
List<Tablet> tablets) throws UserException {
- long visibleVersion = partition.getVisibleVersion();
+ long visibleVersion = Partition.PARTITION_INIT_VERSION;
+
+ // For cloud mode, set scan range visible version in Coordinator.exec so that we could
+ // assign a snapshot version of all partitions.
+ if (!(Config.isCloudMode() && Config.enable_cloud_snapshot_version)) {
+ visibleVersion = partition.getVisibleVersion();
+ }
String visibleVersionStr = String.valueOf(visibleVersion);
Set<Tag> allowedTags = Sets.newHashSet();
@@ -763,6 +791,9 @@ public class OlapScanNode extends ScanNode {
paloRange.setTabletId(tabletId);
// random shuffle List && only collect one copy
+ //
+ // ATTN: visibleVersion is not used in cloud mode, see CloudReplica.checkVersionCatchup
+ // for details.
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
if (replicas.isEmpty()) {
if (ConnectContext.get().getSessionVariable().skipBadTablet) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 222fc37a429..244c08c3709 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -22,6 +22,8 @@ import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
@@ -147,6 +149,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
public class Coordinator implements CoordInterface {
@@ -2223,6 +2226,10 @@ public class Coordinator implements CoordInterface {
// Populates scan_range_assignment_.
// <fragment, <server, nodeId>>
private void computeScanRangeAssignment() throws Exception {
+ if (Config.isCloudMode() && Config.enable_cloud_snapshot_version) {
+ setVisibleVersionForOlapScanNode();
+ }
+
Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
Map<TNetworkAddress, Long> replicaNumPerHost = getReplicaNumPerHostForOlapTable();
Collections.shuffle(scanNodes);
@@ -2438,6 +2445,59 @@ public class Coordinator implements CoordInterface {
// TODO: more ranges?
}
+ // In cloud mode, meta read lock is not enough to keep a snapshot of the partition versions.
+ // After all scan node are collected, it is possible to gain a snapshot of the partition version.
+ private void setVisibleVersionForOlapScanNode() throws RpcException, UserException {
+ List<CloudPartition> partitions = new ArrayList<>();
+ Set<Long> partitionSet = new HashSet<>();
+ for (ScanNode node : scanNodes) {
+ if (!(node instanceof OlapScanNode)) {
+ continue;
+ }
+
+ OlapScanNode scanNode = (OlapScanNode) node;
+ OlapTable table = scanNode.getOlapTable();
+ for (Long id : scanNode.getSelectedPartitionIds()) {
+ if (!partitionSet.contains(id)) {
+ partitionSet.add(id);
+ partitions.add((CloudPartition) table.getPartition(id));
+ }
+ }
+ }
+
+ if (partitions.isEmpty()) {
+ return;
+ }
+
+ List<Long> versions = CloudPartition.getSnapshotVisibleVersion(partitions);
+ assert versions.size() == partitions.size() : "the got num versions is not equals to acquired num versions";
+ if (versions.stream().anyMatch(x -> x <= 0)) {
+ int size = versions.size();
+ for (int i = 0; i < size; ++i) {
+ if (versions.get(i) <= 0) {
+ LOG.warn("partition {} getVisibleVersion error, the visibleVersion is {}",
+ partitions.get(i).getId(), versions.get(i));
+ throw new UserException("partition " + partitions.get(i).getId()
+ + " getVisibleVersion error, the visibleVersion is " + versions.get(i));
+ }
+ }
+ }
+
+ // ATTN: the table ids are ignored here because the both id are allocated from a same id generator.
+ Map<Long, Long> visibleVersionMap = IntStream.range(0, versions.size())
+ .boxed()
+ .collect(Collectors.toMap(i -> partitions.get(i).getId(), versions::get));
+
+ for (ScanNode node : scanNodes) {
+ if (!(node instanceof OlapScanNode)) {
+ continue;
+ }
+
+ OlapScanNode scanNode = (OlapScanNode) node;
+ scanNode.updateScanRangeVersions(visibleVersionMap);
+ }
+ }
+
// update job progress from BE
public void updateFragmentExecStatus(TReportExecStatusParams params) {
if (enablePipelineXEngine) {
diff --git a/regression-test/suites/query_p0/cross_db/cross_db.groovy b/regression-test/suites/query_p0/cross_db/cross_db.groovy
new file mode 100644
index 00000000000..169b18c8bfb
--- /dev/null
+++ b/regression-test/suites/query_p0/cross_db/cross_db.groovy
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
+// and modified by Doris.
+
+suite("cross_db") {
+ sql """CREATE DATABASE IF NOT EXISTS cross_db_1"""
+ sql """CREATE DATABASE IF NOT EXISTS cross_db_2"""
+ sql """
+ CREATE TABLE IF NOT EXISTS cross_db_1.cnt_table (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT DEFAULT "0")
+ AGGREGATE KEY(`id`, `count`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_num" = "1")
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS cross_db_2.cnt_table (
+ `id` LARGEINT NOT NULL,
+ `count` LARGEINT DEFAULT "0")
+ AGGREGATE KEY(`id`, `count`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES ("replication_num" = "1")
+ """
+ sql """
+ INSERT INTO cross_db_1.cnt_table VALUES
+ (1, 10),
+ (2, 32),
+ (3, 40),
+ (4, 33)
+ """
+
+ sql """
+ INSERT INTO cross_db_2.cnt_table VALUES
+ (1, 10),
+ (2, 32),
+ (3, 40),
+ (4, 40)
+ """
+
+ sql """SELECT * FROM cross_db_1.cnt_table,cross_db_2.cnt_table"""
+
+ sql """DROP DATABASE cross_db_1"""
+ sql """DROP DATABASE cross_db_2"""
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org