You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/12/22 14:36:55 UTC

[GitHub] [doris] morningman commented on a diff in pull request #15250: [feature] Add auto bucket implement

morningman commented on code in PR #15250:
URL: https://github.com/apache/doris/pull/15250#discussion_r1055504998


##########
fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java:
##########
@@ -93,6 +93,9 @@ public class PropertyAnalyzer {
 
     public static final String PROPERTIES_INMEMORY = "in_memory";
 
+    public static final String PROPERTIES_AUTO_BUCKET = "auto_bucket";

Review Comment:
   I didn't see this property in the example you gave.



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java:
##########
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.DiskInfo.DiskState;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class AutoBucketUtils {
+    private static Logger logger = LogManager.getLogger(AutoBucketUtils.class);
+
+    private static final long SIZE_100MB = 100 * 1024 * 1024L;
+    private static final long SIZE_1GB = 1 * 1024 * 1024 * 1024L;
+
+    private static int getBENum() {
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+        ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null);
+
+        int activeBENum = 0;
+        for (Backend backend : backends.values()) {
+            if (backend.isAlive()) {
+                ++activeBENum;
+            }
+        }
+        return activeBENum;
+    }
+
+    private static int getBucketsNumByBEDisks() {
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+        ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null);
+
+        int buckets = 0;
+        for (Backend backend : backends.values()) {
+            if (!backend.isLoadAvailable()) {

Review Comment:
   Why judge `isLoadAvailable`?



##########
fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java:
##########
@@ -140,6 +142,69 @@ private Map<String, String> createDefaultRuntimeInfo() {
         return defaultRuntimeInfo;
     }
 
+    // exponential moving average
+    private static long ema(ArrayList<Long> history, int period) {
+        double alpha = 2.0 / (period + 1);
+        double ema = history.get(0);
+        for (int i = 1; i < history.size(); i++) {
+            ema = alpha * history.get(i) + (1 - alpha) * ema;
+        }
+        return (long) ema;
+    }
+
+    private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize) {
+        if (historyPartitionsSize.size() < 2) {
+            return historyPartitionsSize.get(0);
+        }
+
+        int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size();
+
+        boolean isAscending = true;
+        for (int i = 1; i < size; i++) {
+            if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) {
+                isAscending = false;
+                break;
+            }
+        }
+
+        if (isAscending) {
+            ArrayList<Long> historyDeltaSize = Lists.newArrayList();
+            for (int i = 1; i < size; i++) {
+                historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1));
+            }
+            return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7);
+        } else {
+            return ema(historyPartitionsSize, 7);
+        }
+    }
+
+    private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table) {
+        if (!table.isAutoBucket()) {
+            return property.getBuckets();
+        }
+
+        // auto bucket
+        List<Partition> partitions = Lists.newArrayList(table.getPartitions());
+        if (partitions.size() == 0) {
+            return property.getBuckets();
+        }
+
+        Collections.sort(partitions, new Comparator<Partition>() {
+            @Override
+            public int compare(Partition p1, Partition p2) {
+                return (int) (p1.getId() - p2.getId());
+            }
+        });
+        ArrayList<Long> parititonsSize = Lists.newArrayList();
+        for (Partition partition : table.getPartitions()) {
+            parititonsSize.add(partition.getDataSize());
+        }
+
+        // * 5 for uncompressed data
+        long uncompressedPartionSize = getNextPartitionSize(parititonsSize) * 5;

Review Comment:
   No need `* 5`, we can just use compressed data size to calc bucket num.



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java:
##########
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.DiskInfo.DiskState;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class AutoBucketUtils {

Review Comment:
   Need write unit test for this class



##########
fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java:
##########
@@ -140,6 +142,69 @@ private Map<String, String> createDefaultRuntimeInfo() {
         return defaultRuntimeInfo;
     }
 
+    // exponential moving average
+    private static long ema(ArrayList<Long> history, int period) {
+        double alpha = 2.0 / (period + 1);
+        double ema = history.get(0);
+        for (int i = 1; i < history.size(); i++) {
+            ema = alpha * history.get(i) + (1 - alpha) * ema;
+        }
+        return (long) ema;
+    }
+
+    private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize) {
+        if (historyPartitionsSize.size() < 2) {
+            return historyPartitionsSize.get(0);
+        }
+
+        int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size();
+
+        boolean isAscending = true;
+        for (int i = 1; i < size; i++) {
+            if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) {
+                isAscending = false;
+                break;
+            }
+        }
+
+        if (isAscending) {
+            ArrayList<Long> historyDeltaSize = Lists.newArrayList();
+            for (int i = 1; i < size; i++) {
+                historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1));
+            }
+            return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7);
+        } else {
+            return ema(historyPartitionsSize, 7);
+        }
+    }
+
+    private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table) {
+        if (!table.isAutoBucket()) {
+            return property.getBuckets();
+        }
+
+        // auto bucket
+        List<Partition> partitions = Lists.newArrayList(table.getPartitions());
+        if (partitions.size() == 0) {
+            return property.getBuckets();
+        }
+
+        Collections.sort(partitions, new Comparator<Partition>() {
+            @Override
+            public int compare(Partition p1, Partition p2) {
+                return (int) (p1.getId() - p2.getId());

Review Comment:
   And looks like you sort the `partitions` but not use it?



##########
fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java:
##########
@@ -140,6 +142,69 @@ private Map<String, String> createDefaultRuntimeInfo() {
         return defaultRuntimeInfo;
     }
 
+    // exponential moving average
+    private static long ema(ArrayList<Long> history, int period) {
+        double alpha = 2.0 / (period + 1);
+        double ema = history.get(0);
+        for (int i = 1; i < history.size(); i++) {
+            ema = alpha * history.get(i) + (1 - alpha) * ema;
+        }
+        return (long) ema;
+    }
+
+    private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize) {
+        if (historyPartitionsSize.size() < 2) {
+            return historyPartitionsSize.get(0);
+        }
+
+        int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size();
+
+        boolean isAscending = true;
+        for (int i = 1; i < size; i++) {
+            if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) {
+                isAscending = false;
+                break;
+            }
+        }
+
+        if (isAscending) {
+            ArrayList<Long> historyDeltaSize = Lists.newArrayList();
+            for (int i = 1; i < size; i++) {
+                historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1));
+            }
+            return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7);
+        } else {
+            return ema(historyPartitionsSize, 7);
+        }
+    }
+
+    private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table) {
+        if (!table.isAutoBucket()) {
+            return property.getBuckets();
+        }
+
+        // auto bucket
+        List<Partition> partitions = Lists.newArrayList(table.getPartitions());
+        if (partitions.size() == 0) {
+            return property.getBuckets();
+        }
+
+        Collections.sort(partitions, new Comparator<Partition>() {
+            @Override
+            public int compare(Partition p1, Partition p2) {
+                return (int) (p1.getId() - p2.getId());

Review Comment:
   You can not rely on partition id to get the latest partition.
   Use partition range value instead.



##########
fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java:
##########
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.DiskInfo.DiskState;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class AutoBucketUtils {
+    private static Logger logger = LogManager.getLogger(AutoBucketUtils.class);
+
+    private static final long SIZE_100MB = 100 * 1024 * 1024L;
+    private static final long SIZE_1GB = 1 * 1024 * 1024 * 1024L;
+
+    private static int getBENum() {
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+        ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null);
+
+        int activeBENum = 0;
+        for (Backend backend : backends.values()) {
+            if (backend.isAlive()) {
+                ++activeBENum;
+            }
+        }
+        return activeBENum;
+    }
+
+    private static int getBucketsNumByBEDisks() {
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+        ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null);
+
+        int buckets = 0;
+        for (Backend backend : backends.values()) {
+            if (!backend.isLoadAvailable()) {
+                break;
+            }
+
+            ImmutableMap<String, DiskInfo> disks = backend.getDisks();
+            for (DiskInfo diskInfo : disks.values()) {
+                if (diskInfo.getState() == DiskState.ONLINE && diskInfo.hasPathHash()) {
+                    buckets += (diskInfo.getAvailableCapacityB() - 1) / (50 * SIZE_1GB) + 1;
+                }
+            }
+        }
+        return buckets;
+    }
+
+    private static int convertParitionSizeToBucketsNum(long partitionSize) {
+        partitionSize /= 5; // for compression 5:1
+
+        // <= 100MB, 1 bucket
+        // <= 1GB, 2 buckets
+        // > 1GB, round to (size / 1G)
+        if (partitionSize <= SIZE_100MB) {
+            return 1;
+        } else if (partitionSize <= SIZE_1GB) {
+            return 2;
+        } else {
+            return (int) ((partitionSize - 1) / SIZE_1GB + 1);
+        }
+    }
+
+    public static int getBucketsNum(long partitionSize) {
+        int bucketsNumByPartitionSize = convertParitionSizeToBucketsNum(partitionSize);
+        int bucketsNumByBE = getBucketsNumByBEDisks();
+        int bucketsNum = Math.min(128, Math.min(bucketsNumByPartitionSize, bucketsNumByBE));
+        int beNum = getBENum();
+        logger.info("AutoBucketsUtil: bucketsNumByPartitionSize {}, bucketsNumByBE {}, bucketsNum {}, beNum {}",

Review Comment:
   too many logs. remove some or change to debug level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org