You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2024/01/23 11:26:13 UTC

(doris) branch master updated: [feature](merge-cloud) Support cloud snapshot version (#30232)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e6367ffaad [feature](merge-cloud) Support cloud snapshot version (#30232)
2e6367ffaad is described below

commit 2e6367ffaad6def9a7a3d4f80a0e7b45f536c5fa
Author: walter <w4...@gmail.com>
AuthorDate: Tue Jan 23 19:26:03 2024 +0800

    [feature](merge-cloud) Support cloud snapshot version (#30232)
    
    In cloud mode, meta read lock is not enough to keep a snapshot of
    the partition versions.  After all scan node are collected, it is
    possible to gain a snapshot of the partition version.  This PR
    batch the requests to obtain a snapshot version and reduce the
    query latency.
---
 .../main/java/org/apache/doris/common/Config.java  |  6 +++
 .../java/org/apache/doris/catalog/OlapTable.java   | 12 +++++
 .../doris/common/profile/SummaryProfile.java       | 12 +++--
 .../org/apache/doris/planner/OlapScanNode.java     | 33 +++++++++++-
 .../main/java/org/apache/doris/qe/Coordinator.java | 60 +++++++++++++++++++++
 .../suites/query_p0/cross_db/cross_db.groovy       | 62 ++++++++++++++++++++++
 6 files changed, 180 insertions(+), 5 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index a93745b4844..46194c484bd 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2470,6 +2470,12 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static int cloud_cold_read_percent = 10; // 10%
 
+    // The original meta read lock is not enough to keep a snapshot of partition versions,
+    // so the execution of `createScanRangeLocations` are delayed to `Coordinator::exec`,
+    // to help to acquire a snapshot of partition versions.
+    @ConfField
+    public static boolean enable_cloud_snapshot_version = true;
+
     //==========================================================================
     //                      end of cloud config
     //==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 1d06b0305a0..937339afb5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -35,6 +35,7 @@ import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.catalog.Tablet.TabletStatus;
 import org.apache.doris.clone.TabletSchedCtx;
 import org.apache.doris.clone.TabletScheduler;
+import org.apache.doris.cloud.catalog.CloudPartition;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
@@ -999,6 +1000,17 @@ public class OlapTable extends Table {
     //
     // ATTN: partitions not belonging to this table will be filtered.
     public List<Long> selectNonEmptyPartitionIds(Collection<Long> partitionIds) {
+        if (Config.isCloudMode() && Config.enable_cloud_snapshot_version) {
+            // Assumption: all partitions are CloudPartition.
+            List<CloudPartition> partitions = partitionIds.stream()
+                    .map(this::getPartition)
+                    .filter(p -> p != null)
+                    .filter(p -> p instanceof CloudPartition)
+                    .map(p -> (CloudPartition) p)
+                    .collect(Collectors.toList());
+            return CloudPartition.selectNonEmptyPartitionIds(partitions);
+        }
+
         return partitionIds.stream()
                 .map(this::getPartition)
                 .filter(p -> p != null)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 01fc64e0747..90125b079eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.common.profile;
 
+import org.apache.doris.common.Config;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.thrift.TUnit;
@@ -224,10 +225,13 @@ public class SummaryProfile {
         executionSummaryProfile.addInfoString(WRITE_RESULT_TIME,
                 RuntimeProfile.printCounter(queryWriteResultConsumeTime, TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME, getPrettyQueryFetchResultFinishTime());
-        executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_TIME, getPrettyGetPartitionVersionTime());
-        executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_COUNT, getPrettyGetPartitionVersionCount());
-        executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT,
-                getPrettyGetPartitionVersionByHasDataCount());
+
+        if (Config.isCloudMode()) {
+            executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_TIME, getPrettyGetPartitionVersionTime());
+            executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_COUNT, getPrettyGetPartitionVersionCount());
+            executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT,
+                    getPrettyGetPartitionVersionByHasDataCount());
+        }
     }
 
     public void setNereidsAnalysisTime() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index e28d9d5c82c..1ccd3fae81e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -722,9 +722,37 @@ public class OlapScanNode extends ScanNode {
         }
     }
 
+    // Update the visible version of the scan range locations.
+    public void updateScanRangeVersions(Map<Long, Long> visibleVersionMap) {
+        Map<Long, TScanRangeLocations> locationsMap = scanRangeLocations.stream()
+                .collect(Collectors.toMap(loc -> loc.getScanRange().getPaloScanRange().getTabletId(), loc -> loc));
+        for (Long partitionId : selectedPartitionIds) {
+            final Partition partition = olapTable.getPartition(partitionId);
+            final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
+            final List<Tablet> tablets = selectedTable.getTablets();
+            Long visibleVersion = visibleVersionMap.get(partitionId);
+            assert visibleVersion != null : "the acquried version is not exists in the visible version map";
+            String visibleVersionStr = String.valueOf(visibleVersion);
+            for (Tablet tablet : tablets) {
+                TScanRangeLocations locations = locationsMap.get(tablet.getId());
+                if (locations == null) {
+                    continue;
+                }
+                TPaloScanRange scanRange = locations.getScanRange().getPaloScanRange();
+                scanRange.setVersion(visibleVersionStr);
+            }
+        }
+    }
+
     private void addScanRangeLocations(Partition partition,
             List<Tablet> tablets) throws UserException {
-        long visibleVersion = partition.getVisibleVersion();
+        long visibleVersion = Partition.PARTITION_INIT_VERSION;
+
+        // For cloud mode, set scan range visible version in Coordinator.exec so that we could
+        // assign a snapshot version of all partitions.
+        if (!(Config.isCloudMode() && Config.enable_cloud_snapshot_version)) {
+            visibleVersion = partition.getVisibleVersion();
+        }
         String visibleVersionStr = String.valueOf(visibleVersion);
 
         Set<Tag> allowedTags = Sets.newHashSet();
@@ -763,6 +791,9 @@ public class OlapScanNode extends ScanNode {
             paloRange.setTabletId(tabletId);
 
             // random shuffle List && only collect one copy
+            //
+            // ATTN: visibleVersion is not used in cloud mode, see CloudReplica.checkVersionCatchup
+            // for details.
             List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
             if (replicas.isEmpty()) {
                 if (ConnectContext.get().getSessionVariable().skipBadTablet) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 222fc37a429..244c08c3709 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -22,6 +22,8 @@ import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.cloud.catalog.CloudPartition;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.Reference;
@@ -147,6 +149,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 public class Coordinator implements CoordInterface {
@@ -2223,6 +2226,10 @@ public class Coordinator implements CoordInterface {
     // Populates scan_range_assignment_.
     // <fragment, <server, nodeId>>
     private void computeScanRangeAssignment() throws Exception {
+        if (Config.isCloudMode() && Config.enable_cloud_snapshot_version) {
+            setVisibleVersionForOlapScanNode();
+        }
+
         Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
         Map<TNetworkAddress, Long> replicaNumPerHost = getReplicaNumPerHostForOlapTable();
         Collections.shuffle(scanNodes);
@@ -2438,6 +2445,59 @@ public class Coordinator implements CoordInterface {
         // TODO: more ranges?
     }
 
+    // In cloud mode, meta read lock is not enough to keep a snapshot of the partition versions.
+    // After all scan node are collected, it is possible to gain a snapshot of the partition version.
+    private void setVisibleVersionForOlapScanNode() throws RpcException, UserException {
+        List<CloudPartition> partitions = new ArrayList<>();
+        Set<Long> partitionSet = new HashSet<>();
+        for (ScanNode node : scanNodes) {
+            if (!(node instanceof OlapScanNode)) {
+                continue;
+            }
+
+            OlapScanNode scanNode = (OlapScanNode) node;
+            OlapTable table = scanNode.getOlapTable();
+            for (Long id : scanNode.getSelectedPartitionIds()) {
+                if (!partitionSet.contains(id)) {
+                    partitionSet.add(id);
+                    partitions.add((CloudPartition) table.getPartition(id));
+                }
+            }
+        }
+
+        if (partitions.isEmpty()) {
+            return;
+        }
+
+        List<Long> versions = CloudPartition.getSnapshotVisibleVersion(partitions);
+        assert versions.size() == partitions.size() : "the got num versions is not equals to acquired num versions";
+        if (versions.stream().anyMatch(x -> x <= 0)) {
+            int size = versions.size();
+            for (int i = 0; i < size; ++i) {
+                if (versions.get(i) <= 0) {
+                    LOG.warn("partition {} getVisibleVersion error, the visibleVersion is {}",
+                            partitions.get(i).getId(), versions.get(i));
+                    throw new UserException("partition " + partitions.get(i).getId()
+                            + " getVisibleVersion error, the visibleVersion is " + versions.get(i));
+                }
+            }
+        }
+
+        // ATTN: the table ids are ignored here because the both id are allocated from a same id generator.
+        Map<Long, Long> visibleVersionMap = IntStream.range(0, versions.size())
+                .boxed()
+                .collect(Collectors.toMap(i -> partitions.get(i).getId(), versions::get));
+
+        for (ScanNode node : scanNodes) {
+            if (!(node instanceof OlapScanNode)) {
+                continue;
+            }
+
+            OlapScanNode scanNode = (OlapScanNode) node;
+            scanNode.updateScanRangeVersions(visibleVersionMap);
+        }
+    }
+
     // update job progress from BE
     public void updateFragmentExecStatus(TReportExecStatusParams params) {
         if (enablePipelineXEngine) {
diff --git a/regression-test/suites/query_p0/cross_db/cross_db.groovy b/regression-test/suites/query_p0/cross_db/cross_db.groovy
new file mode 100644
index 00000000000..169b18c8bfb
--- /dev/null
+++ b/regression-test/suites/query_p0/cross_db/cross_db.groovy
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
+// and modified by Doris.
+
+suite("cross_db") {
+    sql """CREATE DATABASE IF NOT EXISTS cross_db_1"""
+    sql """CREATE DATABASE IF NOT EXISTS cross_db_2"""
+    sql """
+    CREATE TABLE IF NOT EXISTS cross_db_1.cnt_table (
+        `id` LARGEINT NOT NULL,
+        `count` LARGEINT  DEFAULT "0")
+    AGGREGATE KEY(`id`, `count`)
+    DISTRIBUTED BY HASH(`id`) BUCKETS 1
+    PROPERTIES ("replication_num" = "1")
+    """
+    sql """
+    CREATE TABLE IF NOT EXISTS cross_db_2.cnt_table (
+        `id` LARGEINT NOT NULL,
+        `count` LARGEINT  DEFAULT "0")
+    AGGREGATE KEY(`id`, `count`)
+    DISTRIBUTED BY HASH(`id`) BUCKETS 1
+    PROPERTIES ("replication_num" = "1")
+    """
+    sql """
+    INSERT INTO cross_db_1.cnt_table VALUES
+        (1, 10),
+        (2, 32),
+        (3, 40),
+        (4, 33)
+    """
+
+    sql """
+    INSERT INTO cross_db_2.cnt_table VALUES
+        (1, 10),
+        (2, 32),
+        (3, 40),
+        (4, 40)
+    """
+
+    sql """SELECT * FROM cross_db_1.cnt_table,cross_db_2.cnt_table"""
+
+    sql """DROP DATABASE cross_db_1"""
+    sql """DROP DATABASE cross_db_2"""
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org