You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ca...@apache.org on 2021/09/16 02:44:07 UTC
[incubator-doris] branch master updated: [Performance] Improve
performance for showing proc statistic(#6567)
This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new dd8a1da [Performance] Improve performance for showing proc statistic(#6567)
dd8a1da is described below
commit dd8a1da159d515546a4cdeeb784b2915126c9590
Author: ccoffline <45...@users.noreply.github.com>
AuthorDate: Thu Sep 16 10:43:57 2021 +0800
[Performance] Improve performance for showing proc statistic(#6567)
Co-authored-by: 迟成 <ch...@meituan.com>
---
.../common/proc/IncompleteTabletsProcNode.java | 45 +--
.../apache/doris/common/proc/StatisticProcDir.java | 304 ++++++++++-----------
.../java/org/apache/doris/task/AgentTaskQueue.java | 21 +-
3 files changed, 160 insertions(+), 210 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java
index 4cdf5de..1a84647 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java
@@ -17,15 +17,14 @@
package org.apache.doris.common.proc;
+import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
+import java.util.Collections;
public class IncompleteTabletsProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
@@ -33,41 +32,21 @@ public class IncompleteTabletsProcNode implements ProcNodeInterface {
.build();
private static final Joiner JOINER = Joiner.on(",");
- Collection<Long> unhealthyTabletIds;
- Collection<Long> inconsistentTabletIds;
- Collection<Long> cloningTabletIds;
- Collection<Long> unrecoverableTabletIds;
+ final Database db;
- public IncompleteTabletsProcNode(Collection<Long> unhealthyTabletIds,
- Collection<Long> inconsistentTabletIds,
- Collection<Long> cloningTabletIds,
- Collection<Long> unrecoverableTabletIds) {
- this.unhealthyTabletIds = unhealthyTabletIds;
- this.inconsistentTabletIds = inconsistentTabletIds;
- this.cloningTabletIds = cloningTabletIds;
- this.unrecoverableTabletIds = unrecoverableTabletIds;
+ public IncompleteTabletsProcNode(Database db) {
+ this.db = db;
}
@Override
public ProcResult fetchResult() throws AnalysisException {
- BaseProcResult result = new BaseProcResult();
-
- result.setNames(TITLE_NAMES);
-
- List<String> row = new ArrayList<String>(1);
-
- String incompleteTablets = JOINER.join(Arrays.asList(unhealthyTabletIds));
- String inconsistentTablets = JOINER.join(Arrays.asList(inconsistentTabletIds));
- String cloningTablets = JOINER.join(Arrays.asList(cloningTabletIds));
- String unrecoverableTablets = JOINER.join(Arrays.asList(unrecoverableTabletIds));
- row.add(incompleteTablets);
- row.add(inconsistentTablets);
- row.add(cloningTablets);
- row.add(unrecoverableTablets);
-
- result.addRow(row);
-
- return result;
+ StatisticProcDir.DBStatistic statistic = new StatisticProcDir.DBStatistic(db);
+ return new BaseProcResult(TITLE_NAMES, Collections.singletonList(Arrays.asList(
+ JOINER.join(statistic.unhealthyTabletIds),
+ JOINER.join(statistic.inconsistentTabletIds),
+ JOINER.join(statistic.cloningTabletIds),
+ JOINER.join(statistic.unrecoverableTabletIds)
+ )));
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
index 1246ad7..e8255f4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
@@ -24,29 +24,31 @@ import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ReplicaAllocation;
-import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
-import org.apache.doris.common.util.ListComparator;
import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Multimap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class StatisticProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
@@ -58,166 +60,28 @@ public class StatisticProcDir implements ProcDirInterface {
private Catalog catalog;
- // db id -> set(tablet id)
- Multimap<Long, Long> unhealthyTabletIds;
- // db id -> set(tablet id)
- Multimap<Long, Long> inconsistentTabletIds;
- // db id -> set(tablet id)
- Multimap<Long, Long> cloningTabletIds;
- // db id -> set(tablet id)
- Multimap<Long, Long> unrecoverableTabletIds;
-
public StatisticProcDir(Catalog catalog) {
+ Preconditions.checkNotNull(catalog);
this.catalog = catalog;
- unhealthyTabletIds = HashMultimap.create();
- inconsistentTabletIds = HashMultimap.create();
- cloningTabletIds = HashMultimap.create();
- unrecoverableTabletIds = HashMultimap.create();
}
@Override
public ProcResult fetchResult() throws AnalysisException {
- Preconditions.checkNotNull(catalog);
-
- BaseProcResult result = new BaseProcResult();
-
- result.setNames(TITLE_NAMES);
- List<Long> dbIds = catalog.getDbIds();
- if (dbIds == null || dbIds.isEmpty()) {
- // empty
- return result;
- }
-
- SystemInfoService infoService = Catalog.getCurrentSystemInfo();
-
- int totalDbNum = 0;
- int totalTableNum = 0;
- int totalPartitionNum = 0;
- int totalIndexNum = 0;
- int totalTabletNum = 0;
- int totalReplicaNum = 0;
-
- unhealthyTabletIds.clear();
- inconsistentTabletIds.clear();
- cloningTabletIds = AgentTaskQueue.getTabletIdsByType(TTaskType.CLONE);
- List<List<Comparable>> lines = new ArrayList<List<Comparable>>();
- for (Long dbId : dbIds) {
- if (dbId == 0) {
+ List<DBStatistic> statistics = catalog.getDbIds().parallelStream()
// skip information_schema database
- continue;
- }
- Database db = catalog.getDbNullable(dbId);
- if (db == null) {
- continue;
- }
-
- ++totalDbNum;
- List<Long> aliveBeIdsInCluster = infoService.getClusterBackendIds(db.getClusterName(), true);
- db.readLock();
- try {
- int dbTableNum = 0;
- int dbPartitionNum = 0;
- int dbIndexNum = 0;
- int dbTabletNum = 0;
- int dbReplicaNum = 0;
-
- for (Table table : db.getTables()) {
- if (table.getType() != TableType.OLAP) {
- continue;
- }
-
- ++dbTableNum;
- OlapTable olapTable = (OlapTable) table;
- table.readLock();
- try {
- for (Partition partition : olapTable.getAllPartitions()) {
- ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId());
- ++dbPartitionNum;
- for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
- ++dbIndexNum;
- for (Tablet tablet : materializedIndex.getTablets()) {
- ++dbTabletNum;
- dbReplicaNum += tablet.getReplicas().size();
-
- Pair<TabletStatus, Priority> res = tablet.getHealthStatusWithPriority(
- infoService, db.getClusterName(),
- partition.getVisibleVersion(), partition.getVisibleVersionHash(),
- replicaAlloc, aliveBeIdsInCluster);
-
- // here we treat REDUNDANT as HEALTHY, for user friendly.
- if (res.first != TabletStatus.HEALTHY && res.first != TabletStatus.REDUNDANT
- && res.first != TabletStatus.COLOCATE_REDUNDANT && res.first != TabletStatus.NEED_FURTHER_REPAIR
- && res.first != TabletStatus.UNRECOVERABLE) {
- unhealthyTabletIds.put(dbId, tablet.getId());
- } else if (res.first == TabletStatus.UNRECOVERABLE) {
- unrecoverableTabletIds.put(dbId, tablet.getId());
- }
-
- if (!tablet.isConsistent()) {
- inconsistentTabletIds.put(dbId, tablet.getId());
- }
- } // end for tablets
- } // end for indices
- } // end for partitions
- } finally {
- table.readUnlock();
- }
- } // end for tables
-
- List<Comparable> oneLine = new ArrayList<Comparable>(TITLE_NAMES.size());
- oneLine.add(dbId);
- oneLine.add(db.getFullName());
- oneLine.add(dbTableNum);
- oneLine.add(dbPartitionNum);
- oneLine.add(dbIndexNum);
- oneLine.add(dbTabletNum);
- oneLine.add(dbReplicaNum);
- oneLine.add(unhealthyTabletIds.get(dbId).size());
- oneLine.add(inconsistentTabletIds.get(dbId).size());
- oneLine.add(cloningTabletIds.get(dbId).size());
- oneLine.add(unrecoverableTabletIds.get(dbId).size());
-
- lines.add(oneLine);
-
- totalTableNum += dbTableNum;
- totalPartitionNum += dbPartitionNum;
- totalIndexNum += dbIndexNum;
- totalTabletNum += dbTabletNum;
- totalReplicaNum += dbReplicaNum;
- } finally {
- db.readUnlock();
- }
- } // end for dbs
-
- // sort by dbName
- ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(1);
- Collections.sort(lines, comparator);
-
- // add sum line after sort
- List<Comparable> finalLine = new ArrayList<Comparable>(TITLE_NAMES.size());
- finalLine.add("Total");
- finalLine.add(totalDbNum);
- finalLine.add(totalTableNum);
- finalLine.add(totalPartitionNum);
- finalLine.add(totalIndexNum);
- finalLine.add(totalTabletNum);
- finalLine.add(totalReplicaNum);
- finalLine.add(unhealthyTabletIds.size());
- finalLine.add(inconsistentTabletIds.size());
- finalLine.add(cloningTabletIds.size());
- finalLine.add(unrecoverableTabletIds.size());
- lines.add(finalLine);
-
- // add result
- for (List<Comparable> line : lines) {
- List<String> row = new ArrayList<String>(line.size());
- for (Comparable comparable : line) {
- row.add(comparable.toString());
- }
- result.addRow(row);
+ .flatMap(id -> Stream.of(id == 0 ? null : catalog.getDbNullable(id)))
+ .filter(Objects::nonNull).map(DBStatistic::new)
+ // sort by dbName
+ .sorted(Comparator.comparing(db -> db.db.getFullName()))
+ .collect(Collectors.toList());
+
+ List<List<String>> rows = new ArrayList<>(statistics.size() + 1);
+ for (DBStatistic statistic : statistics) {
+ rows.add(statistic.toRow());
}
+ rows.add(statistics.stream().reduce(new DBStatistic(), DBStatistic::reduce).toRow());
- return result;
+ return new BaseProcResult(TITLE_NAMES, rows);
}
@Override
@@ -227,16 +91,132 @@ public class StatisticProcDir implements ProcDirInterface {
@Override
public ProcNodeInterface lookup(String dbIdStr) throws AnalysisException {
- long dbId = -1L;
try {
- dbId = Long.valueOf(dbIdStr);
+ long dbId = Long.parseLong(dbIdStr);
+ return catalog.getDb(dbId).map(IncompleteTabletsProcNode::new).orElse(null);
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid db id format: " + dbIdStr);
}
+ }
+
+ static class DBStatistic {
+ boolean summary;
+ Database db;
+ int dbNum;
+ int tableNum;
+ int partitionNum;
+ int indexNum;
+ int tabletNum;
+ int replicaNum;
+ int unhealthyTabletNum;
+ int inconsistentTabletNum;
+ int cloningTabletNum;
+ int badTabletNum;
+ Set<Long> unhealthyTabletIds;
+ Set<Long> inconsistentTabletIds;
+ Set<Long> cloningTabletIds;
+ Set<Long> unrecoverableTabletIds;
+
+ DBStatistic() {
+ this.summary = true;
+ }
+
+ DBStatistic(Database db) {
+ Preconditions.checkNotNull(db);
+ this.summary = false;
+ this.db = db;
+ this.dbNum = 1;
+ this.unhealthyTabletIds = new HashSet<>();
+ this.inconsistentTabletIds = new HashSet<>();
+ this.unrecoverableTabletIds = new HashSet<>();
+ this.cloningTabletIds = AgentTaskQueue.getTask(db.getId(), TTaskType.CLONE)
+ .stream().map(AgentTask::getTabletId).collect(Collectors.toSet());
+
+ SystemInfoService infoService = Catalog.getCurrentSystemInfo();
+ List<Long> aliveBeIdsInCluster = infoService.getClusterBackendIds(db.getClusterName(), true);
+ db.getTables().stream().filter(t -> t != null && t.getType() == TableType.OLAP).forEach(t -> {
+ ++tableNum;
+ OlapTable olapTable = (OlapTable) t;
+ olapTable.readLock();
+ try {
+ for (Partition partition : olapTable.getAllPartitions()) {
+ ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId());
+ ++partitionNum;
+ for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+ ++indexNum;
+ for (Tablet tablet : materializedIndex.getTablets()) {
+ ++tabletNum;
+ replicaNum += tablet.getReplicas().size();
+
+ Pair<TabletStatus, Priority> res = tablet.getHealthStatusWithPriority(
+ infoService, db.getClusterName(),
+ partition.getVisibleVersion(), partition.getVisibleVersionHash(),
+ replicaAlloc, aliveBeIdsInCluster);
+
+ // here we treat REDUNDANT as HEALTHY, for user friendly.
+ if (res.first != TabletStatus.HEALTHY && res.first != TabletStatus.REDUNDANT
+ && res.first != TabletStatus.COLOCATE_REDUNDANT && res.first != TabletStatus.NEED_FURTHER_REPAIR
+ && res.first != TabletStatus.UNRECOVERABLE) {
+ unhealthyTabletIds.add(tablet.getId());
+ } else if (res.first == TabletStatus.UNRECOVERABLE) {
+ unrecoverableTabletIds.add(tablet.getId());
+ }
+
+ if (!tablet.isConsistent()) {
+ inconsistentTabletIds.add(tablet.getId());
+ }
+ } // end for tablets
+ } // end for indices
+ } // end for partitions
+ } finally {
+ olapTable.readUnlock();
+ }
+ });
+ unhealthyTabletNum = unhealthyTabletIds.size();
+ inconsistentTabletNum = inconsistentTabletIds.size();
+ cloningTabletNum = cloningTabletIds.size();
+ badTabletNum = unrecoverableTabletIds.size();
+ }
+
+ DBStatistic reduce(DBStatistic other) {
+ if (this.summary) {
+ this.dbNum += other.dbNum;
+ this.tableNum += other.tableNum;
+ this.partitionNum += other.partitionNum;
+ this.indexNum += other.indexNum;
+ this.tabletNum += other.tabletNum;
+ this.replicaNum += other.replicaNum;
+ this.unhealthyTabletNum += other.unhealthyTabletNum;
+ this.inconsistentTabletNum += other.inconsistentTabletNum;
+ this.cloningTabletNum += other.cloningTabletNum;
+ this.badTabletNum += other.badTabletNum;
+ return this;
+ } else if (other.summary) {
+ return other.reduce(this);
+ } else {
+ return new DBStatistic().reduce(this).reduce(other);
+ }
+ }
- return new IncompleteTabletsProcNode(unhealthyTabletIds.get(dbId),
- inconsistentTabletIds.get(dbId),
- cloningTabletIds.get(dbId),
- unrecoverableTabletIds.get(dbId));
+ List<String> toRow() {
+ List<Object> row = new ArrayList<>(TITLE_NAMES.size());
+ if (summary) {
+ row.add("Total");
+ row.add(dbNum);
+ } else {
+ row.add(db.getId());
+ row.add(db.getFullName());
+ }
+ row.add(tableNum);
+ row.add(partitionNum);
+ row.add(indexNum);
+ row.add(tabletNum);
+ row.add(replicaNum);
+ row.add(unhealthyTabletNum);
+ row.add(inconsistentTabletNum);
+ row.add(cloningTabletNum);
+ row.add(badTabletNum);
+ return row.stream().map(String::valueOf).collect(Collectors.toList());
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index 96aba8e..59ce71c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -21,10 +21,8 @@ import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TTaskType;
import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
@@ -152,6 +150,12 @@ public class AgentTaskQueue {
return res;
}
+ public static synchronized List<AgentTask> getTask(long dbId, TTaskType type) {
+ Map<Long, Map<Long, AgentTask>> taskMap = tasks.column(type);
+ Map<Long, AgentTask> signatureMap = taskMap == null ? null : taskMap.get(dbId);
+ return signatureMap == null ? new ArrayList<>() : new ArrayList<>(signatureMap.values());
+ }
+
public static synchronized List<AgentTask> getDiffTasks(long backendId, Map<TTaskType, Set<Long>> runningTasks) {
List<AgentTask> diffTasks = new ArrayList<AgentTask>();
if (!tasks.containsRow(backendId)) {
@@ -217,19 +221,6 @@ public class AgentTaskQueue {
return taskNum;
}
- public static synchronized Multimap<Long, Long> getTabletIdsByType(TTaskType type) {
- Multimap<Long, Long> tabletIds = HashMultimap.create();
- Map<Long, Map<Long, AgentTask>> taskMap = tasks.column(type);
- if (taskMap != null) {
- for (Map<Long, AgentTask> signatureMap : taskMap.values()) {
- for (AgentTask task : signatureMap.values()) {
- tabletIds.put(task.getDbId(), task.getTabletId());
- }
- }
- }
- return tabletIds;
- }
-
public static synchronized int getTaskNum(long backendId, TTaskType type, boolean isFailed) {
int taskNum = 0;
if (backendId != -1) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org