You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ca...@apache.org on 2021/09/17 02:11:47 UTC

[incubator-doris] branch master updated: [Bug] Fix some bugs (#6665)

This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fee8e6a  [Bug] Fix some bugs (#6665)
fee8e6a is described below

commit fee8e6afc56709d1616e310fc9bb229e63439630
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Sep 17 10:11:37 2021 +0800

    [Bug] Fix some bugs (#6665)
    
    1.Fix a potential BE coredump of sending batch when loading data. (Fix [Bug] BE crash when loading data #6656)
    2.Fix a potential BE coredump when doing schema change. (Fix [Bug] BE crash when doing alter task #6657)
    3.Optimize the metric of base_compaction_request_failed.
    4.Add Order column in show tablet result. (Fix [Feature] Add order column in SHOW TABLET stmt result #6658)
    5.Fix bug that tablet repair slot not being released. (Fix [Bug] Tablet scheduler stop working #6659)
    6.Fix bug that REPLICA_MISSING error can not be handled. (Fix [Bug] REPLICA_MISSING error can not be handled. #6660)
    7.Modify column name of SHOW PROC "/cluster_balance/cluster_load_stat"
    8.Optimize the result of SHOW PROC "/statistic" to show COLOCATE_MISMATCH tablets (Fix [Feature] the health status of colocate table's tablet is not shown in show proc statistic #6663)
    9.Fix bug that show load where state='pending' can not be executed. (Fix [Bug] show load where state='pending' can not be executed. #6664)
---
 be/src/exec/tablet_sink.cpp                        |   1 +
 be/src/olap/schema_change.cpp                      |   8 ++-
 be/src/olap/tablet.cpp                             |   2 +-
 fe/fe-core/AlterRoutineLoadOperationLogTest        | Bin 478 -> 0 bytes
 fe/fe-core/diskInfoTest                            | Bin 158 -> 0 bytes
 .../org/apache/doris/analysis/ShowLoadStmt.java    |   2 +-
 .../org/apache/doris/analysis/ShowTabletStmt.java  |   1 +
 .../java/org/apache/doris/catalog/Replica.java     |   6 +++
 .../main/java/org/apache/doris/catalog/Tablet.java |  10 ++--
 .../org/apache/doris/clone/TabletSchedCtx.java     |  58 +++++++++++----------
 .../org/apache/doris/clone/TabletScheduler.java    |  10 ++--
 .../doris/common/proc/ClusterLoadStatByTag.java    |  30 ++++++-----
 .../apache/doris/common/proc/StatisticProcDir.java |  31 +++++++----
 .../java/org/apache/doris/load/LoadChecker.java    |   3 +-
 .../apache/doris/planner/DistributedPlanner.java   |   4 +-
 .../org/apache/doris/planner/OlapScanNode.java     |   8 ++-
 .../main/java/org/apache/doris/qe/Coordinator.java |   2 +-
 .../java/org/apache/doris/qe/ShowExecutor.java     |  17 +++---
 .../java/org/apache/doris/qe/StmtExecutor.java     |   2 +-
 .../apache/doris/analysis/ShowLoadStmtTest.java    |  10 +++-
 .../org/apache/doris/clone/TabletSchedCtxTest.java |  37 +++++++++++++
 21 files changed, 162 insertions(+), 80 deletions(-)

diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index f780971..9c1e31c 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -845,6 +845,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
     _stop_background_threads_latch.count_down();
     if (_sender_thread) {
         _sender_thread->join();
+        _send_batch_thread_pool_token->shutdown();
     }
 
     Expr::close(_output_expr_ctxs, state);
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 04e5070..49a7f96 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1573,7 +1573,11 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
         }
 
         for (auto& rs_reader : rs_readers) {
-            rs_reader->init(&reader_context);
+            res = rs_reader->init(&reader_context);
+            if (res != OLAP_SUCCESS) {
+                LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name();
+                break;
+            }
         }
 
     } while (0);
@@ -1717,7 +1721,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
 
     RowsetReaderSharedPtr rowset_reader;
     RETURN_NOT_OK((*base_rowset)->create_reader(_mem_tracker, &rowset_reader));
-    rowset_reader->init(&reader_context);
+    RETURN_NOT_OK(rowset_reader->init(&reader_context));
 
     RowsetWriterContext writer_context;
     writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 1746c5d..c1471fa 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1345,8 +1345,8 @@ int64_t Tablet::prepare_compaction_and_calculate_permits(CompactionType compacti
         OLAPStatus res = _base_compaction->prepare_compact();
         if (res != OLAP_SUCCESS) {
             set_last_base_compaction_failure_time(UnixMillis());
-            DorisMetrics::instance()->base_compaction_request_failed->increment(1);
             if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) {
+                DorisMetrics::instance()->base_compaction_request_failed->increment(1);
                 LOG(WARNING) << "failed to pick rowsets for base compaction. res=" << res
                              << ", tablet=" << full_name();
             }
diff --git a/fe/fe-core/AlterRoutineLoadOperationLogTest b/fe/fe-core/AlterRoutineLoadOperationLogTest
deleted file mode 100644
index ae3953e..0000000
Binary files a/fe/fe-core/AlterRoutineLoadOperationLogTest and /dev/null differ
diff --git a/fe/fe-core/diskInfoTest b/fe/fe-core/diskInfoTest
deleted file mode 100644
index 866d039..0000000
Binary files a/fe/fe-core/diskInfoTest and /dev/null differ
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java
index 610477e..afee714 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java
@@ -219,7 +219,7 @@ public class ShowLoadStmt extends ShowStmt {
                 break CHECK;
             }
 
-            if (!isAccurateMatch && !value.contains("%")) {
+            if (hasLabel && !isAccurateMatch && !value.contains("%")) {
                 value = "%" + value + "%";
             }
             if (hasLabel) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java
index ac8063a..d31db26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java
@@ -284,6 +284,7 @@ public class ShowTabletStmt extends ShowStmt {
             builder.addColumn(new Column("PartitionId", ScalarType.createVarchar(30)));
             builder.addColumn(new Column("IndexId", ScalarType.createVarchar(30)));
             builder.addColumn(new Column("IsSync", ScalarType.createVarchar(30)));
+            builder.addColumn(new Column("Order", ScalarType.createVarchar(30)));
             builder.addColumn(new Column("DetailCmd", ScalarType.createVarchar(30)));
         } else {
             for (String title : TabletsProcDir.TITLE_NAMES) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index 42a7790..70f4709 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -563,4 +563,10 @@ public class Replica implements Writable {
     public long getWatermarkTxnId() {
         return watermarkTxnId;
     }
+
+    public boolean isAlive() {
+        return getState() != ReplicaState.CLONE
+                && getState() != ReplicaState.DECOMMISSION
+                && !isBad();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index f1d3b01..cec8d8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -27,9 +27,6 @@ import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -37,6 +34,9 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import com.google.gson.annotations.SerializedName;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -419,9 +419,7 @@ public class Tablet extends MetaObject implements Writable {
         Set<String> hosts = Sets.newHashSet();
         for (Replica replica : replicas) {
             Backend backend = systemInfoService.getBackend(replica.getBackendId());
-            if (backend == null || !backend.isAlive() || replica.getState() == ReplicaState.CLONE
-                    || replica.getState() == ReplicaState.DECOMMISSION
-                    || replica.isBad() || !hosts.add(backend.getHost())) {
+            if (backend == null || !backend.isAlive() || !replica.isAlive() || !hosts.add(backend.getHost())) {
                 // this replica is not alive,
                 // or if this replica is on same host with another replica, we also treat it as 'dead',
                 // so that Tablet Scheduler will create a new replica on different host.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 18e66b1..b34af4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -54,6 +54,8 @@ import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -97,7 +99,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
      * from the tablet scheduler.
      */
     private static final int RUNNING_FAILED_COUNTER_THRESHOLD = 3;
-    
+
+    private static VersionCountComparator VERSION_COUNTER_COMPARATOR = new VersionCountComparator();
+
     public enum Type {
         BALANCE, REPAIR
     }
@@ -504,50 +508,33 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
             if (replica.getLastFailedVersion() > 0) {
                 continue;
             }
-            
+
             if (!replica.checkVersionCatchUp(visibleVersion, visibleVersionHash, false)) {
                 continue;
             }
-            
+
             candidates.add(replica);
         }
-        
+
         if (candidates.isEmpty()) {
             throw new SchedException(Status.UNRECOVERABLE, "unable to find source replica");
         }
 
         // choose a replica which slot is available from candidates.
-        long minVersionCount = Long.MAX_VALUE;
-        boolean findSrcReplica = false;
+        // sort replica by version count asc, so that we prefer to choose replicas with fewer versions
+        Collections.sort(candidates, VERSION_COUNTER_COMPARATOR);
         for (Replica srcReplica : candidates) {
             PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendId());
             if (slot == null) {
                 continue;
             }
-            
+
             long srcPathHash = slot.takeSlot(srcReplica.getPathHash());
             if (srcPathHash != -1) {
-                if (!findSrcReplica) {
-                    // version count is set by report process, so it may not be set yet and default value is -1.
-                    // so we need to check it.
-                    minVersionCount = srcReplica.getVersionCount() == -1 ? Long.MAX_VALUE : srcReplica.getVersionCount();
-                    setSrc(srcReplica);
-                    findSrcReplica = true;
-                } else {
-                    long curVerCount = srcReplica.getVersionCount() == -1 ? Long.MAX_VALUE : srcReplica.getVersionCount();
-                    if (curVerCount < minVersionCount) {
-                        minVersionCount = curVerCount;
-                        setSrc(srcReplica);
-                        findSrcReplica = true;
-                    }
-                }
+                setSrc(srcReplica);
+                return;
             }
         }
-
-        if (findSrcReplica) {
-            return;
-        }
-        
         throw new SchedException(Status.SCHEDULE_FAILED, "unable to find source slot");
     }
     
@@ -623,7 +610,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
         if (destPathHash == -1) {
             throw new SchedException(Status.SCHEDULE_FAILED, "unable to take slot of dest path");
         }
-
         if (chosenReplica.getState() == ReplicaState.DECOMMISSION) {
             // Since this replica is selected as the repair object of VERSION_INCOMPLETE,
             // it means that this replica needs to be able to accept loading data.
@@ -666,7 +652,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
                 }
             }
         }
-        
+
         if (destPathHash != -1) {
             PathSlot slot = tabletScheduler.getBackendsWorkingSlots().get(destBackendId);
             if (slot != null) {
@@ -1102,4 +1088,20 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
         }
         return sb.toString();
     }
+
+    // Comparator to sort the replica with version count, asc
+    public static class VersionCountComparator implements Comparator<Replica> {
+        @Override
+        public int compare(Replica r1, Replica r2) {
+            long verCount1 = r1.getVersionCount() == -1 ? Long.MAX_VALUE : r1.getVersionCount();
+            long verCount2 = r2.getVersionCount() == -1 ? Long.MAX_VALUE : r2.getVersionCount();
+            if (verCount1 < verCount2) {
+                return -1;
+            } else if (verCount1 > verCount2) {
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 907c917..d63772c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -638,7 +638,7 @@ public class TabletScheduler extends MasterDaemon {
         Map<Tag, Short> currentAllocMap = Maps.newHashMap();
         for (Replica replica : replicas) {
             Backend be = infoService.getBackend(replica.getBackendId());
-            if (be != null) {
+            if (be != null && be.isAlive() && replica.isAlive()) {
                 Short num = currentAllocMap.getOrDefault(be.getTag(), (short) 0);
                 currentAllocMap.put(be.getTag(), (short) (num + 1));
             }
@@ -1196,11 +1196,11 @@ public class TabletScheduler extends MasterDaemon {
 
             PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId());
             if (slot == null) {
-                LOG.debug("backend {} does not found when getting slots", rootPathLoadStatistic.getBeId());
                 continue;
             }
 
-            if (slot.takeSlot(rootPathLoadStatistic.getPathHash()) != -1) {
+            long pathHash = slot.takeSlot(rootPathLoadStatistic.getPathHash());
+            if (pathHash != -1) {
                 return rootPathLoadStatistic;
             }
         }
@@ -1209,11 +1209,11 @@ public class TabletScheduler extends MasterDaemon {
         for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) {
             PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId());
             if (slot == null) {
-                LOG.debug("backend {} does not found when getting slots", rootPathLoadStatistic.getBeId());
                 continue;
             }
 
-            if (slot.takeSlot(rootPathLoadStatistic.getPathHash()) != -1) {
+            long pathHash = slot.takeSlot(rootPathLoadStatistic.getPathHash());
+            if (pathHash != -1) {
                 return rootPathLoadStatistic;
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java
index 095143c..0f2fb8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java
@@ -34,25 +34,15 @@ import java.util.Set;
 // SHOW PROC "/cluster_balance/cluster_load_stat"
 public class ClusterLoadStatByTag implements ProcDirInterface {
     public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add(
-            "StorageMedium").build();
-
-    private Map<String, Tag> tagMap = Maps.newHashMap();
+            "Tag").build();
 
     @Override
     public ProcResult fetchResult() throws AnalysisException {
         BaseProcResult result = new BaseProcResult();
         result.setNames(TITLE_NAMES);
-        List<Long> beIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
-        Set<Tag> tags = Sets.newHashSet();
-        for (long beId : beIds) {
-            Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
-            if (be != null) {
-                tags.add(be.getTag());
-            }
-        }
+        Set<Tag> tags = genTagMap();
         for (Tag tag : tags) {
             result.addRow(Lists.newArrayList(tag.toKey()));
-            tagMap.put(tag.toKey(), tag);
         }
         return result;
     }
@@ -64,6 +54,11 @@ public class ClusterLoadStatByTag implements ProcDirInterface {
 
     @Override
     public ProcNodeInterface lookup(String name) throws AnalysisException {
+        Set<Tag> tags = genTagMap();
+        Map<String, Tag> tagMap = Maps.newHashMap();
+        for (Tag tag : tags) {
+            tagMap.put(tag.toKey(), tag);
+        }
         Tag tag = tagMap.get(name);
         if (tag == null) {
             throw new AnalysisException("No such tag: " + name);
@@ -71,4 +66,15 @@ public class ClusterLoadStatByTag implements ProcDirInterface {
         return new ClusterLoadStatByTagAndMedium(tag);
     }
 
+    private Set<Tag> genTagMap() {
+        Set<Tag> tags = Sets.newHashSet();
+        List<Long> beIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
+        for (long beId : beIds) {
+            Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
+            if (be != null) {
+                tags.add(be.getTag());
+            }
+        }
+        return tags;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
index e8255f4..3670747 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
@@ -18,6 +18,7 @@
 package org.apache.doris.common.proc;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
@@ -133,10 +134,13 @@ public class StatisticProcDir implements ProcDirInterface {
                     .stream().map(AgentTask::getTabletId).collect(Collectors.toSet());
 
             SystemInfoService infoService = Catalog.getCurrentSystemInfo();
+            ColocateTableIndex colocateTableIndex = Catalog.getCurrentColocateIndex();
             List<Long> aliveBeIdsInCluster = infoService.getClusterBackendIds(db.getClusterName(), true);
             db.getTables().stream().filter(t -> t != null && t.getType() == TableType.OLAP).forEach(t -> {
                 ++tableNum;
                 OlapTable olapTable = (OlapTable) t;
+                ColocateTableIndex.GroupId groupId = colocateTableIndex.isColocateTable(olapTable.getId()) ?
+                        colocateTableIndex.getGroup(olapTable.getId()) : null;
                 olapTable.readLock();
                 try {
                     for (Partition partition : olapTable.getAllPartitions()) {
@@ -144,21 +148,30 @@ public class StatisticProcDir implements ProcDirInterface {
                         ++partitionNum;
                         for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
                             ++indexNum;
-                            for (Tablet tablet : materializedIndex.getTablets()) {
+                            List<Tablet> tablets = materializedIndex.getTablets();
+                            for (int i = 0; i < tablets.size(); ++i) {
+                                Tablet tablet = tablets.get(i);
                                 ++tabletNum;
                                 replicaNum += tablet.getReplicas().size();
 
-                                Pair<TabletStatus, Priority> res = tablet.getHealthStatusWithPriority(
-                                        infoService, db.getClusterName(),
-                                        partition.getVisibleVersion(), partition.getVisibleVersionHash(),
-                                        replicaAlloc, aliveBeIdsInCluster);
+                                TabletStatus res = null;
+                                if (groupId != null) {
+                                    Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, i);
+                                    res = tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc, backendsSet);
+                                } else {
+                                    Pair<TabletStatus, Priority> pair = tablet.getHealthStatusWithPriority(
+                                            infoService, db.getClusterName(),
+                                            partition.getVisibleVersion(), partition.getVisibleVersionHash(),
+                                            replicaAlloc, aliveBeIdsInCluster);
+                                    res = pair.first;
+                                }
 
                                 // here we treat REDUNDANT as HEALTHY, for user friendly.
-                                if (res.first != TabletStatus.HEALTHY && res.first != TabletStatus.REDUNDANT
-                                        && res.first != TabletStatus.COLOCATE_REDUNDANT && res.first != TabletStatus.NEED_FURTHER_REPAIR
-                                        && res.first != TabletStatus.UNRECOVERABLE) {
+                                if (res != TabletStatus.HEALTHY && res != TabletStatus.REDUNDANT
+                                        && res != TabletStatus.COLOCATE_REDUNDANT && res != TabletStatus.NEED_FURTHER_REPAIR
+                                        && res != TabletStatus.UNRECOVERABLE) {
                                     unhealthyTabletIds.add(tablet.getId());
-                                } else if (res.first == TabletStatus.UNRECOVERABLE) {
+                                } else if (res == TabletStatus.UNRECOVERABLE) {
                                     unrecoverableTabletIds.add(tablet.getId());
                                 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
index 32f36f6..3793acc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
@@ -170,7 +170,8 @@ public class LoadChecker extends MasterDaemon {
                         task = new HadoopLoadPendingTask(job);
                         break;
                     default:
-                        LOG.warn("unknown etl job type. type: {}", etlJobType.name());
+                        LOG.warn("unknown etl job type. type: {}, job id: {}, label: {}, db: {}",
+                                etlJobType.name(), job.getId(), job.getLabel(), job.getDbId());
                         break;
                 }
                 if (task != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 8c3b609..7a166ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -182,7 +182,7 @@ public class DistributedPlanner {
      */
     private PlanFragment createPlanFragments(
             PlanNode root, boolean isPartitioned,
-            long perNodeMemLimit, ArrayList<PlanFragment> fragments) throws UserException, AnalysisException {
+            long perNodeMemLimit, ArrayList<PlanFragment> fragments) throws UserException {
         ArrayList<PlanFragment> childFragments = Lists.newArrayList();
         for (PlanNode child : root.getChildren()) {
             // allow child fragments to be partitioned, unless they contain a limit clause
@@ -272,7 +272,7 @@ public class DistributedPlanner {
      * fragment
      * TODO: hbase scans are range-partitioned on the row key
      */
-    private PlanFragment createScanFragment(PlanNode node) {
+    private PlanFragment createScanFragment(PlanNode node) throws UserException {
         if (node instanceof MysqlScanNode || node instanceof OdbcScanNode) {
             return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED);
         } else if (node instanceof SchemaScanNode) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 3423487..246788c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -903,14 +903,18 @@ public class OlapScanNode extends ScanNode {
     The reason is that @coordicator will not set the scan range for the fragment,
         when data partition of fragment is UNPARTITION.
      */
-    public DataPartition constructInputPartitionByDistributionInfo() {
+    public DataPartition constructInputPartitionByDistributionInfo() throws UserException {
         ColocateTableIndex colocateTableIndex = Catalog.getCurrentColocateIndex();
         if ((colocateTableIndex.isColocateTable(olapTable.getId())
                 && !colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId())))
                 || olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED
                 || olapTable.getPartitions().size() == 1) {
             DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
-            Preconditions.checkState(distributionInfo instanceof HashDistributionInfo);
+            if (!(distributionInfo instanceof HashDistributionInfo)) {
+                // There may be some random distribution table left, throw exception here.
+                // And these table should be modified to hash distribution by ALTER TABLE operation.
+                throw new UserException("Table with non hash distribution is not supported: " + olapTable.getName());
+            }
             List<Column> distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
             List<Expr> dataDistributeExprs = Lists.newArrayList();
             for (Column column : distributeColumns) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 2ded32b..e4196dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -596,7 +596,7 @@ public class Coordinator {
                         cancelInternal(InternalService.PPlanFragmentCancelReason.INTERNAL_ERROR);
                         switch (code) {
                             case TIMEOUT:
-                                throw new UserException("send fragment timeout. backend id: " + pair.first.backend.getId());
+                                throw new RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: " + pair.first.backend.getId());
                             case THRIFT_RPC_ERROR:
                                 SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg);
                                 throw new RpcException(pair.first.backend.getHost(), "rpc failed");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 8778104..e7f7003 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -155,16 +155,16 @@ import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TUnit;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 
-import org.apache.commons.lang3.tuple.Triple;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -1387,6 +1387,7 @@ public class ShowExecutor {
             String indexName = FeConstants.null_string;
             Boolean isSync = true;
 
+            int tabletIdx = -1;
             // check real meta
             do {
                 Database db = catalog.getDbNullable(dbId);
@@ -1425,6 +1426,8 @@ public class ShowExecutor {
                         break;
                     }
 
+                    tabletIdx = index.getTabletOrderIdx(tablet.getId());
+
                     List<Replica> replicas = tablet.getReplicas();
                     for (Replica replica : replicas) {
                         Replica tmp = invertedIndex.getReplica(tabletId, replica.getBackendId());
@@ -1447,9 +1450,9 @@ public class ShowExecutor {
             String detailCmd = String.format("SHOW PROC '/dbs/%d/%d/partitions/%d/%d/%d';",
                                              dbId, tableId, partitionId, indexId, tabletId);
             rows.add(Lists.newArrayList(dbName, tableName, partitionName, indexName,
-                                        dbId.toString(), tableId.toString(),
-                                        partitionId.toString(), indexId.toString(),
-                                        isSync.toString(), detailCmd));
+                    dbId.toString(), tableId.toString(),
+                    partitionId.toString(), indexId.toString(),
+                    isSync.toString(), String.valueOf(tabletIdx), detailCmd));
         } else {
             Database db = catalog.getDbOrAnalysisException(showStmt.getDbName());
             OlapTable olapTable = db.getOlapTableOrAnalysisException(showStmt.getTableName());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c885273..d581e70 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -402,7 +402,7 @@ public class StmtExecutor implements ProfileWriter {
             throw e;
         } catch (UserException e) {
             // analysis exception only print message, not print the stack
-            LOG.warn("execute Exception. {}", e);
+            LOG.warn("execute Exception. {}", e.getMessage());
             context.getState().setError(e.getMessage());
             context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
         } catch (Exception e) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java
index b9027f3..66d807a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.analysis;
 
-import mockit.Expectations;
 import org.apache.doris.analysis.BinaryPredicate.Operator;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.FakeCatalog;
@@ -29,6 +28,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import mockit.Expectations;
+
 public class ShowLoadStmtTest {
     private Analyzer analyzer;
     private Catalog catalog;
@@ -110,10 +111,15 @@ public class ShowLoadStmtTest {
 
         StringLiteral stringLiteralLike = new StringLiteral("ab%");
         LikePredicate likePredicate = new LikePredicate(org.apache.doris.analysis.LikePredicate.Operator.LIKE,
-                                                        slotRef, stringLiteralLike);
+                slotRef, stringLiteralLike);
 
         stmt = new ShowLoadStmt(null, likePredicate, null, new LimitElement(10));
         stmt.analyze(analyzer);
         Assert.assertEquals("SHOW LOAD FROM `testCluster:testDb` WHERE `label` LIKE \'ab%\' LIMIT 10", stmt.toString());
+
+        BinaryPredicate statePredicate = new BinaryPredicate(Operator.EQ, new SlotRef(null, "state"), new StringLiteral("PENDING"));
+        stmt = new ShowLoadStmt(null, statePredicate, null, new LimitElement(10));
+        stmt.analyze(analyzer);
+        Assert.assertEquals("SHOW LOAD FROM `testCluster:testDb` WHERE `state` = \'PENDING\' LIMIT 10", stmt.toString());
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
index b9d9ed2..e9d3119 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
@@ -17,13 +17,18 @@
 
 package org.apache.doris.clone;
 
+import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.clone.TabletSchedCtx.Type;
 
+import com.google.common.collect.Lists;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.PriorityQueue;
 
 public class TabletSchedCtxTest {
@@ -75,4 +80,36 @@ public class TabletSchedCtxTest {
         Assert.assertEquals(ctx2.getTabletId(), expectedCtx.getTabletId());
     }
 
+    @Test
+    public void testVersionCountComparator() {
+        TabletSchedCtx.VersionCountComparator countComparator = new TabletSchedCtx.VersionCountComparator();
+        List<Replica> replicaList = Lists.newArrayList();
+        Replica replica1 = new Replica();
+        replica1.setVersionCount(100);
+        replica1.setState(Replica.ReplicaState.NORMAL);
+
+        Replica replica2 = new Replica();
+        replica2.setVersionCount(50);
+        replica2.setState(Replica.ReplicaState.NORMAL);
+
+        Replica replica3 = new Replica();
+        replica3.setVersionCount(-1);
+        replica3.setState(Replica.ReplicaState.NORMAL);
+
+        Replica replica4 = new Replica();
+        replica4.setVersionCount(200);
+        replica4.setState(Replica.ReplicaState.NORMAL);
+
+        replicaList.add(replica1);
+        replicaList.add(replica2);
+        replicaList.add(replica3);
+        replicaList.add(replica4);
+
+        Collections.sort(replicaList, countComparator);
+        Assert.assertEquals(50, replicaList.get(0).getVersionCount());
+        Assert.assertEquals(100, replicaList.get(1).getVersionCount());
+        Assert.assertEquals(200, replicaList.get(2).getVersionCount());
+        Assert.assertEquals(-1, replicaList.get(3).getVersionCount());
+    }
+
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org