You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ca...@apache.org on 2021/09/17 02:11:47 UTC
[incubator-doris] branch master updated: [Bug] Fix some bugs (#6665)
This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new fee8e6a [Bug] Fix some bugs (#6665)
fee8e6a is described below
commit fee8e6afc56709d1616e310fc9bb229e63439630
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Sep 17 10:11:37 2021 +0800
[Bug] Fix some bugs (#6665)
1.Fix a potential BE coredump of sending batch when loading data. (Fix [Bug] BE crash when loading data #6656)
2.Fix a potential BE coredump when doing schema change. (Fix [Bug] BE crash when doing alter task #6657)
3.Optimize the metric of base_compaction_request_failed.
4.Add Order column in show tablet result. (Fix [Feature] Add order column in SHOW TABLET stmt result #6658)
5.Fix bug that tablet repair slot not being released. (Fix [Bug] Tablet scheduler stop working #6659)
6.Fix bug that REPLICA_MISSING error can not be handled. (Fix [Bug] REPLICA_MISSING error can not be handled. #6660)
7.Modify column name of SHOW PROC "/cluster_balance/cluster_load_stat"
8.Optimize the result of SHOW PROC "/statistic" to show COLOCATE_MISMATCH tablets (Fix [Feature] the health status of colocate table's tablet is not shown in show proc statistic #6663)
9.Fix bug that show load where state='pending' can not be executed. (Fix [Bug] show load where state='pending' can not be executed. #6664)
---
be/src/exec/tablet_sink.cpp | 1 +
be/src/olap/schema_change.cpp | 8 ++-
be/src/olap/tablet.cpp | 2 +-
fe/fe-core/AlterRoutineLoadOperationLogTest | Bin 478 -> 0 bytes
fe/fe-core/diskInfoTest | Bin 158 -> 0 bytes
.../org/apache/doris/analysis/ShowLoadStmt.java | 2 +-
.../org/apache/doris/analysis/ShowTabletStmt.java | 1 +
.../java/org/apache/doris/catalog/Replica.java | 6 +++
.../main/java/org/apache/doris/catalog/Tablet.java | 10 ++--
.../org/apache/doris/clone/TabletSchedCtx.java | 58 +++++++++++----------
.../org/apache/doris/clone/TabletScheduler.java | 10 ++--
.../doris/common/proc/ClusterLoadStatByTag.java | 30 ++++++-----
.../apache/doris/common/proc/StatisticProcDir.java | 31 +++++++----
.../java/org/apache/doris/load/LoadChecker.java | 3 +-
.../apache/doris/planner/DistributedPlanner.java | 4 +-
.../org/apache/doris/planner/OlapScanNode.java | 8 ++-
.../main/java/org/apache/doris/qe/Coordinator.java | 2 +-
.../java/org/apache/doris/qe/ShowExecutor.java | 17 +++---
.../java/org/apache/doris/qe/StmtExecutor.java | 2 +-
.../apache/doris/analysis/ShowLoadStmtTest.java | 10 +++-
.../org/apache/doris/clone/TabletSchedCtxTest.java | 37 +++++++++++++
21 files changed, 162 insertions(+), 80 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index f780971..9c1e31c 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -845,6 +845,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
_stop_background_threads_latch.count_down();
if (_sender_thread) {
_sender_thread->join();
+ _send_batch_thread_pool_token->shutdown();
}
Expr::close(_output_expr_ctxs, state);
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 04e5070..49a7f96 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1573,7 +1573,11 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
}
for (auto& rs_reader : rs_readers) {
- rs_reader->init(&reader_context);
+ res = rs_reader->init(&reader_context);
+ if (res != OLAP_SUCCESS) {
+ LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name();
+ break;
+ }
}
} while (0);
@@ -1717,7 +1721,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
RowsetReaderSharedPtr rowset_reader;
RETURN_NOT_OK((*base_rowset)->create_reader(_mem_tracker, &rowset_reader));
- rowset_reader->init(&reader_context);
+ RETURN_NOT_OK(rowset_reader->init(&reader_context));
RowsetWriterContext writer_context;
writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 1746c5d..c1471fa 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1345,8 +1345,8 @@ int64_t Tablet::prepare_compaction_and_calculate_permits(CompactionType compacti
OLAPStatus res = _base_compaction->prepare_compact();
if (res != OLAP_SUCCESS) {
set_last_base_compaction_failure_time(UnixMillis());
- DorisMetrics::instance()->base_compaction_request_failed->increment(1);
if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) {
+ DorisMetrics::instance()->base_compaction_request_failed->increment(1);
LOG(WARNING) << "failed to pick rowsets for base compaction. res=" << res
<< ", tablet=" << full_name();
}
diff --git a/fe/fe-core/AlterRoutineLoadOperationLogTest b/fe/fe-core/AlterRoutineLoadOperationLogTest
deleted file mode 100644
index ae3953e..0000000
Binary files a/fe/fe-core/AlterRoutineLoadOperationLogTest and /dev/null differ
diff --git a/fe/fe-core/diskInfoTest b/fe/fe-core/diskInfoTest
deleted file mode 100644
index 866d039..0000000
Binary files a/fe/fe-core/diskInfoTest and /dev/null differ
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java
index 610477e..afee714 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowLoadStmt.java
@@ -219,7 +219,7 @@ public class ShowLoadStmt extends ShowStmt {
break CHECK;
}
- if (!isAccurateMatch && !value.contains("%")) {
+ if (hasLabel && !isAccurateMatch && !value.contains("%")) {
value = "%" + value + "%";
}
if (hasLabel) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java
index ac8063a..d31db26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTabletStmt.java
@@ -284,6 +284,7 @@ public class ShowTabletStmt extends ShowStmt {
builder.addColumn(new Column("PartitionId", ScalarType.createVarchar(30)));
builder.addColumn(new Column("IndexId", ScalarType.createVarchar(30)));
builder.addColumn(new Column("IsSync", ScalarType.createVarchar(30)));
+ builder.addColumn(new Column("Order", ScalarType.createVarchar(30)));
builder.addColumn(new Column("DetailCmd", ScalarType.createVarchar(30)));
} else {
for (String title : TabletsProcDir.TITLE_NAMES) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index 42a7790..70f4709 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -563,4 +563,10 @@ public class Replica implements Writable {
public long getWatermarkTxnId() {
return watermarkTxnId;
}
+
+ public boolean isAlive() {
+ return getState() != ReplicaState.CLONE
+ && getState() != ReplicaState.DECOMMISSION
+ && !isBad();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index f1d3b01..cec8d8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -27,9 +27,6 @@ import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -37,6 +34,9 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -419,9 +419,7 @@ public class Tablet extends MetaObject implements Writable {
Set<String> hosts = Sets.newHashSet();
for (Replica replica : replicas) {
Backend backend = systemInfoService.getBackend(replica.getBackendId());
- if (backend == null || !backend.isAlive() || replica.getState() == ReplicaState.CLONE
- || replica.getState() == ReplicaState.DECOMMISSION
- || replica.isBad() || !hosts.add(backend.getHost())) {
+ if (backend == null || !backend.isAlive() || !replica.isAlive() || !hosts.add(backend.getHost())) {
// this replica is not alive,
// or if this replica is on same host with another replica, we also treat it as 'dead',
// so that Tablet Scheduler will create a new replica on different host.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 18e66b1..b34af4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -54,6 +54,8 @@ import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -97,7 +99,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
* from the tablet scheduler.
*/
private static final int RUNNING_FAILED_COUNTER_THRESHOLD = 3;
-
+
+ private static VersionCountComparator VERSION_COUNTER_COMPARATOR = new VersionCountComparator();
+
public enum Type {
BALANCE, REPAIR
}
@@ -504,50 +508,33 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
if (replica.getLastFailedVersion() > 0) {
continue;
}
-
+
if (!replica.checkVersionCatchUp(visibleVersion, visibleVersionHash, false)) {
continue;
}
-
+
candidates.add(replica);
}
-
+
if (candidates.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, "unable to find source replica");
}
// choose a replica which slot is available from candidates.
- long minVersionCount = Long.MAX_VALUE;
- boolean findSrcReplica = false;
+ // sort replica by version count asc, so that we prefer to choose replicas with fewer versions
+ Collections.sort(candidates, VERSION_COUNTER_COMPARATOR);
for (Replica srcReplica : candidates) {
PathSlot slot = backendsWorkingSlots.get(srcReplica.getBackendId());
if (slot == null) {
continue;
}
-
+
long srcPathHash = slot.takeSlot(srcReplica.getPathHash());
if (srcPathHash != -1) {
- if (!findSrcReplica) {
- // version count is set by report process, so it may not be set yet and default value is -1.
- // so we need to check it.
- minVersionCount = srcReplica.getVersionCount() == -1 ? Long.MAX_VALUE : srcReplica.getVersionCount();
- setSrc(srcReplica);
- findSrcReplica = true;
- } else {
- long curVerCount = srcReplica.getVersionCount() == -1 ? Long.MAX_VALUE : srcReplica.getVersionCount();
- if (curVerCount < minVersionCount) {
- minVersionCount = curVerCount;
- setSrc(srcReplica);
- findSrcReplica = true;
- }
- }
+ setSrc(srcReplica);
+ return;
}
}
-
- if (findSrcReplica) {
- return;
- }
-
throw new SchedException(Status.SCHEDULE_FAILED, "unable to find source slot");
}
@@ -623,7 +610,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
if (destPathHash == -1) {
throw new SchedException(Status.SCHEDULE_FAILED, "unable to take slot of dest path");
}
-
if (chosenReplica.getState() == ReplicaState.DECOMMISSION) {
// Since this replica is selected as the repair object of VERSION_INCOMPLETE,
// it means that this replica needs to be able to accept loading data.
@@ -666,7 +652,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
}
}
}
-
+
if (destPathHash != -1) {
PathSlot slot = tabletScheduler.getBackendsWorkingSlots().get(destBackendId);
if (slot != null) {
@@ -1102,4 +1088,20 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
}
return sb.toString();
}
+
+ // Comparator to sort the replica with version count, asc
+ public static class VersionCountComparator implements Comparator<Replica> {
+ @Override
+ public int compare(Replica r1, Replica r2) {
+ long verCount1 = r1.getVersionCount() == -1 ? Long.MAX_VALUE : r1.getVersionCount();
+ long verCount2 = r2.getVersionCount() == -1 ? Long.MAX_VALUE : r2.getVersionCount();
+ if (verCount1 < verCount2) {
+ return -1;
+ } else if (verCount1 > verCount2) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 907c917..d63772c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -638,7 +638,7 @@ public class TabletScheduler extends MasterDaemon {
Map<Tag, Short> currentAllocMap = Maps.newHashMap();
for (Replica replica : replicas) {
Backend be = infoService.getBackend(replica.getBackendId());
- if (be != null) {
+ if (be != null && be.isAlive() && replica.isAlive()) {
Short num = currentAllocMap.getOrDefault(be.getTag(), (short) 0);
currentAllocMap.put(be.getTag(), (short) (num + 1));
}
@@ -1196,11 +1196,11 @@ public class TabletScheduler extends MasterDaemon {
PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId());
if (slot == null) {
- LOG.debug("backend {} does not found when getting slots", rootPathLoadStatistic.getBeId());
continue;
}
- if (slot.takeSlot(rootPathLoadStatistic.getPathHash()) != -1) {
+ long pathHash = slot.takeSlot(rootPathLoadStatistic.getPathHash());
+ if (pathHash != -1) {
return rootPathLoadStatistic;
}
}
@@ -1209,11 +1209,11 @@ public class TabletScheduler extends MasterDaemon {
for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) {
PathSlot slot = backendsWorkingSlots.get(rootPathLoadStatistic.getBeId());
if (slot == null) {
- LOG.debug("backend {} does not found when getting slots", rootPathLoadStatistic.getBeId());
continue;
}
- if (slot.takeSlot(rootPathLoadStatistic.getPathHash()) != -1) {
+ long pathHash = slot.takeSlot(rootPathLoadStatistic.getPathHash());
+ if (pathHash != -1) {
return rootPathLoadStatistic;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java
index 095143c..0f2fb8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java
@@ -34,25 +34,15 @@ import java.util.Set;
// SHOW PROC "/cluster_balance/cluster_load_stat"
public class ClusterLoadStatByTag implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add(
- "StorageMedium").build();
-
- private Map<String, Tag> tagMap = Maps.newHashMap();
+ "Tag").build();
@Override
public ProcResult fetchResult() throws AnalysisException {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
- List<Long> beIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
- Set<Tag> tags = Sets.newHashSet();
- for (long beId : beIds) {
- Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
- if (be != null) {
- tags.add(be.getTag());
- }
- }
+ Set<Tag> tags = genTagMap();
for (Tag tag : tags) {
result.addRow(Lists.newArrayList(tag.toKey()));
- tagMap.put(tag.toKey(), tag);
}
return result;
}
@@ -64,6 +54,11 @@ public class ClusterLoadStatByTag implements ProcDirInterface {
@Override
public ProcNodeInterface lookup(String name) throws AnalysisException {
+ Set<Tag> tags = genTagMap();
+ Map<String, Tag> tagMap = Maps.newHashMap();
+ for (Tag tag : tags) {
+ tagMap.put(tag.toKey(), tag);
+ }
Tag tag = tagMap.get(name);
if (tag == null) {
throw new AnalysisException("No such tag: " + name);
@@ -71,4 +66,15 @@ public class ClusterLoadStatByTag implements ProcDirInterface {
return new ClusterLoadStatByTagAndMedium(tag);
}
+ private Set<Tag> genTagMap() {
+ Set<Tag> tags = Sets.newHashSet();
+ List<Long> beIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
+ for (long beId : beIds) {
+ Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
+ if (be != null) {
+ tags.add(be.getTag());
+ }
+ }
+ return tags;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
index e8255f4..3670747 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcDir.java
@@ -18,6 +18,7 @@
package org.apache.doris.common.proc;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
@@ -133,10 +134,13 @@ public class StatisticProcDir implements ProcDirInterface {
.stream().map(AgentTask::getTabletId).collect(Collectors.toSet());
SystemInfoService infoService = Catalog.getCurrentSystemInfo();
+ ColocateTableIndex colocateTableIndex = Catalog.getCurrentColocateIndex();
List<Long> aliveBeIdsInCluster = infoService.getClusterBackendIds(db.getClusterName(), true);
db.getTables().stream().filter(t -> t != null && t.getType() == TableType.OLAP).forEach(t -> {
++tableNum;
OlapTable olapTable = (OlapTable) t;
+ ColocateTableIndex.GroupId groupId = colocateTableIndex.isColocateTable(olapTable.getId()) ?
+ colocateTableIndex.getGroup(olapTable.getId()) : null;
olapTable.readLock();
try {
for (Partition partition : olapTable.getAllPartitions()) {
@@ -144,21 +148,30 @@ public class StatisticProcDir implements ProcDirInterface {
++partitionNum;
for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
++indexNum;
- for (Tablet tablet : materializedIndex.getTablets()) {
+ List<Tablet> tablets = materializedIndex.getTablets();
+ for (int i = 0; i < tablets.size(); ++i) {
+ Tablet tablet = tablets.get(i);
++tabletNum;
replicaNum += tablet.getReplicas().size();
- Pair<TabletStatus, Priority> res = tablet.getHealthStatusWithPriority(
- infoService, db.getClusterName(),
- partition.getVisibleVersion(), partition.getVisibleVersionHash(),
- replicaAlloc, aliveBeIdsInCluster);
+ TabletStatus res = null;
+ if (groupId != null) {
+ Set<Long> backendsSet = colocateTableIndex.getTabletBackendsByGroup(groupId, i);
+ res = tablet.getColocateHealthStatus(partition.getVisibleVersion(), replicaAlloc, backendsSet);
+ } else {
+ Pair<TabletStatus, Priority> pair = tablet.getHealthStatusWithPriority(
+ infoService, db.getClusterName(),
+ partition.getVisibleVersion(), partition.getVisibleVersionHash(),
+ replicaAlloc, aliveBeIdsInCluster);
+ res = pair.first;
+ }
// here we treat REDUNDANT as HEALTHY, for user friendly.
- if (res.first != TabletStatus.HEALTHY && res.first != TabletStatus.REDUNDANT
- && res.first != TabletStatus.COLOCATE_REDUNDANT && res.first != TabletStatus.NEED_FURTHER_REPAIR
- && res.first != TabletStatus.UNRECOVERABLE) {
+ if (res != TabletStatus.HEALTHY && res != TabletStatus.REDUNDANT
+ && res != TabletStatus.COLOCATE_REDUNDANT && res != TabletStatus.NEED_FURTHER_REPAIR
+ && res != TabletStatus.UNRECOVERABLE) {
unhealthyTabletIds.add(tablet.getId());
- } else if (res.first == TabletStatus.UNRECOVERABLE) {
+ } else if (res == TabletStatus.UNRECOVERABLE) {
unrecoverableTabletIds.add(tablet.getId());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
index 32f36f6..3793acc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
@@ -170,7 +170,8 @@ public class LoadChecker extends MasterDaemon {
task = new HadoopLoadPendingTask(job);
break;
default:
- LOG.warn("unknown etl job type. type: {}", etlJobType.name());
+ LOG.warn("unknown etl job type. type: {}, job id: {}, label: {}, db: {}",
+ etlJobType.name(), job.getId(), job.getLabel(), job.getDbId());
break;
}
if (task != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 8c3b609..7a166ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -182,7 +182,7 @@ public class DistributedPlanner {
*/
private PlanFragment createPlanFragments(
PlanNode root, boolean isPartitioned,
- long perNodeMemLimit, ArrayList<PlanFragment> fragments) throws UserException, AnalysisException {
+ long perNodeMemLimit, ArrayList<PlanFragment> fragments) throws UserException {
ArrayList<PlanFragment> childFragments = Lists.newArrayList();
for (PlanNode child : root.getChildren()) {
// allow child fragments to be partitioned, unless they contain a limit clause
@@ -272,7 +272,7 @@ public class DistributedPlanner {
* fragment
* TODO: hbase scans are range-partitioned on the row key
*/
- private PlanFragment createScanFragment(PlanNode node) {
+ private PlanFragment createScanFragment(PlanNode node) throws UserException {
if (node instanceof MysqlScanNode || node instanceof OdbcScanNode) {
return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED);
} else if (node instanceof SchemaScanNode) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 3423487..246788c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -903,14 +903,18 @@ public class OlapScanNode extends ScanNode {
The reason is that @coordicator will not set the scan range for the fragment,
when data partition of fragment is UNPARTITION.
*/
- public DataPartition constructInputPartitionByDistributionInfo() {
+ public DataPartition constructInputPartitionByDistributionInfo() throws UserException {
ColocateTableIndex colocateTableIndex = Catalog.getCurrentColocateIndex();
if ((colocateTableIndex.isColocateTable(olapTable.getId())
&& !colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId())))
|| olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED
|| olapTable.getPartitions().size() == 1) {
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
- Preconditions.checkState(distributionInfo instanceof HashDistributionInfo);
+ if (!(distributionInfo instanceof HashDistributionInfo)) {
+ // There may be some random distribution table left, throw exception here.
+ // And these table should be modified to hash distribution by ALTER TABLE operation.
+ throw new UserException("Table with non hash distribution is not supported: " + olapTable.getName());
+ }
List<Column> distributeColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
List<Expr> dataDistributeExprs = Lists.newArrayList();
for (Column column : distributeColumns) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 2ded32b..e4196dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -596,7 +596,7 @@ public class Coordinator {
cancelInternal(InternalService.PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (code) {
case TIMEOUT:
- throw new UserException("send fragment timeout. backend id: " + pair.first.backend.getId());
+ throw new RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: " + pair.first.backend.getId());
case THRIFT_RPC_ERROR:
SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg);
throw new RpcException(pair.first.backend.getHost(), "rpc failed");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 8778104..e7f7003 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -155,16 +155,16 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TUnit;
import org.apache.doris.transaction.GlobalTransactionMgr;
-import org.apache.commons.lang3.tuple.Triple;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -1387,6 +1387,7 @@ public class ShowExecutor {
String indexName = FeConstants.null_string;
Boolean isSync = true;
+ int tabletIdx = -1;
// check real meta
do {
Database db = catalog.getDbNullable(dbId);
@@ -1425,6 +1426,8 @@ public class ShowExecutor {
break;
}
+ tabletIdx = index.getTabletOrderIdx(tablet.getId());
+
List<Replica> replicas = tablet.getReplicas();
for (Replica replica : replicas) {
Replica tmp = invertedIndex.getReplica(tabletId, replica.getBackendId());
@@ -1447,9 +1450,9 @@ public class ShowExecutor {
String detailCmd = String.format("SHOW PROC '/dbs/%d/%d/partitions/%d/%d/%d';",
dbId, tableId, partitionId, indexId, tabletId);
rows.add(Lists.newArrayList(dbName, tableName, partitionName, indexName,
- dbId.toString(), tableId.toString(),
- partitionId.toString(), indexId.toString(),
- isSync.toString(), detailCmd));
+ dbId.toString(), tableId.toString(),
+ partitionId.toString(), indexId.toString(),
+ isSync.toString(), String.valueOf(tabletIdx), detailCmd));
} else {
Database db = catalog.getDbOrAnalysisException(showStmt.getDbName());
OlapTable olapTable = db.getOlapTableOrAnalysisException(showStmt.getTableName());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c885273..d581e70 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -402,7 +402,7 @@ public class StmtExecutor implements ProfileWriter {
throw e;
} catch (UserException e) {
// analysis exception only print message, not print the stack
- LOG.warn("execute Exception. {}", e);
+ LOG.warn("execute Exception. {}", e.getMessage());
context.getState().setError(e.getMessage());
context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
} catch (Exception e) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java
index b9027f3..66d807a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/ShowLoadStmtTest.java
@@ -17,7 +17,6 @@
package org.apache.doris.analysis;
-import mockit.Expectations;
import org.apache.doris.analysis.BinaryPredicate.Operator;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.FakeCatalog;
@@ -29,6 +28,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import mockit.Expectations;
+
public class ShowLoadStmtTest {
private Analyzer analyzer;
private Catalog catalog;
@@ -110,10 +111,15 @@ public class ShowLoadStmtTest {
StringLiteral stringLiteralLike = new StringLiteral("ab%");
LikePredicate likePredicate = new LikePredicate(org.apache.doris.analysis.LikePredicate.Operator.LIKE,
- slotRef, stringLiteralLike);
+ slotRef, stringLiteralLike);
stmt = new ShowLoadStmt(null, likePredicate, null, new LimitElement(10));
stmt.analyze(analyzer);
Assert.assertEquals("SHOW LOAD FROM `testCluster:testDb` WHERE `label` LIKE \'ab%\' LIMIT 10", stmt.toString());
+
+ BinaryPredicate statePredicate = new BinaryPredicate(Operator.EQ, new SlotRef(null, "state"), new StringLiteral("PENDING"));
+ stmt = new ShowLoadStmt(null, statePredicate, null, new LimitElement(10));
+ stmt.analyze(analyzer);
+ Assert.assertEquals("SHOW LOAD FROM `testCluster:testDb` WHERE `state` = \'PENDING\' LIMIT 10", stmt.toString());
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
index b9d9ed2..e9d3119 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
@@ -17,13 +17,18 @@
package org.apache.doris.clone;
+import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletSchedCtx.Type;
+import com.google.common.collect.Lists;
+
import org.junit.Assert;
import org.junit.Test;
+import java.util.Collections;
+import java.util.List;
import java.util.PriorityQueue;
public class TabletSchedCtxTest {
@@ -75,4 +80,36 @@ public class TabletSchedCtxTest {
Assert.assertEquals(ctx2.getTabletId(), expectedCtx.getTabletId());
}
+ @Test
+ public void testVersionCountComparator() {
+ TabletSchedCtx.VersionCountComparator countComparator = new TabletSchedCtx.VersionCountComparator();
+ List<Replica> replicaList = Lists.newArrayList();
+ Replica replica1 = new Replica();
+ replica1.setVersionCount(100);
+ replica1.setState(Replica.ReplicaState.NORMAL);
+
+ Replica replica2 = new Replica();
+ replica2.setVersionCount(50);
+ replica2.setState(Replica.ReplicaState.NORMAL);
+
+ Replica replica3 = new Replica();
+ replica3.setVersionCount(-1);
+ replica3.setState(Replica.ReplicaState.NORMAL);
+
+ Replica replica4 = new Replica();
+ replica4.setVersionCount(200);
+ replica4.setState(Replica.ReplicaState.NORMAL);
+
+ replicaList.add(replica1);
+ replicaList.add(replica2);
+ replicaList.add(replica3);
+ replicaList.add(replica4);
+
+ Collections.sort(replicaList, countComparator);
+ Assert.assertEquals(50, replicaList.get(0).getVersionCount());
+ Assert.assertEquals(100, replicaList.get(1).getVersionCount());
+ Assert.assertEquals(200, replicaList.get(2).getVersionCount());
+ Assert.assertEquals(-1, replicaList.get(3).getVersionCount());
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org