You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ca...@apache.org on 2021/04/15 07:56:14 UTC
[incubator-doris] branch master updated: [Bug] Fix alter table
failed when none of new load jobs succeed on alter replica (#5617)
This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new b6c0767 [Bug] Fix alter table failed when none of new load jobs succeed on alter replica (#5617)
b6c0767 is described below
commit b6c0767754f8723da20d1fde961eb54fcd75f167
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Thu Apr 15 15:55:57 2021 +0800
[Bug] Fix alter table failed when none of new load jobs succeed on alter replica (#5617)
* [Bug] Fix alter table failed when none of new load jobs succeed on altering replica
Co-authored-by: caiconghui <ca...@xiaomi.com>
---
.../java/org/apache/doris/alter/AlterHandler.java | 22 ++++++----------------
.../java/org/apache/doris/alter/AlterJobV2.java | 1 +
.../java/org/apache/doris/alter/RollupJobV2.java | 12 ++++++------
.../org/apache/doris/alter/SchemaChangeJobV2.java | 14 +++++++-------
.../apache/doris/catalog/MaterializedIndex.java | 4 ++--
.../org/apache/doris/planner/OlapTableSink.java | 4 +++-
.../main/java/org/apache/doris/system/Backend.java | 2 +-
.../doris/transaction/GlobalTransactionMgr.java | 2 +-
8 files changed, 27 insertions(+), 34 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
index 54e7d0c..98e4e41 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
@@ -393,16 +393,16 @@ public abstract class AlterHandler extends MasterDaemon {
* We assume that the specified version is X.
* Case 1:
* After alter table process starts, there is no new load job being submitted. So the new replica
- * should be with version (1-0). So we just modify the replica's version to partition's visible version, which is X.
+ * should be with version (0-1). So we just modify the replica's version to partition's visible version, which is X.
* Case 2:
* After alter table process starts, there are some load job being processed.
* Case 2.1:
- * Only one new load job, and it failed on this replica. so the replica's last failed version should be X + 1
- * and version is still 1. We should modify the replica's version to (last failed version - 1)
+ * None of them succeed on this replica. so the version is still 1. We should modify the replica's version to X.
* Case 2.2
* There are new load jobs after alter task, and at least one of them is succeed on this replica.
* So the replica's version should be larger than X. So we don't need to modify the replica version
* because its already looks like normal.
+ * In summary, we only need to update replica's version when replica's version is smaller than X
*/
public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundException {
Database db = Catalog.getCurrentCatalog().getDb(task.getDbId());
@@ -431,19 +431,9 @@ public abstract class AlterHandler extends MasterDaemon {
LOG.info("before handle alter task tablet {}, replica: {}, task version: {}-{}",
task.getSignature(), replica, task.getVersion(), task.getVersionHash());
boolean versionChanged = false;
- if (replica.getVersion() > task.getVersion()) {
- // Case 2.2, do nothing
- } else {
- if (replica.getLastFailedVersion() > task.getVersion()) {
- // Case 2.1
- replica.updateVersionInfo(task.getVersion(), task.getVersionHash(), replica.getDataSize(), replica.getRowCount());
- versionChanged = true;
- } else {
- // Case 1
- Preconditions.checkState(replica.getLastFailedVersion() == -1, replica.getLastFailedVersion());
- replica.updateVersionInfo(task.getVersion(), task.getVersionHash(), replica.getDataSize(), replica.getRowCount());
- versionChanged = true;
- }
+ if (replica.getVersion() < task.getVersion()) {
+ replica.updateVersionInfo(task.getVersion(), task.getVersionHash(), replica.getDataSize(), replica.getRowCount());
+ versionChanged = true;
}
if (versionChanged) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
index e80ad31..15d5a39 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -208,6 +208,7 @@ public abstract class AlterJobV2 implements Writable {
// table is stable, set is to ROLLUP and begin altering.
LOG.info("table {} is stable, start {} job {}", tableId, type);
tbl.setState(type == JobType.ROLLUP ? OlapTableState.ROLLUP : OlapTableState.SCHEMA_CHANGE);
+ errMsg = "";
return true;
}
} finally {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index c0d06d1..470d4c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -553,7 +553,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
* Should replay all changes before this job's state transfer to PENDING.
* These changes should be same as changes in RollupHander.processAddRollup()
*/
- private void replayPending(RollupJobV2 replayedJob) {
+ private void replayCreateJob(RollupJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
// database may be dropped before replaying this log. just return
@@ -602,7 +602,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
* Replay job in WAITING_TXN state.
* Should replay all changes in runPendingJob()
*/
- private void replayWaitingTxn(RollupJobV2 replayedJob) {
+ private void replayPendingJob(RollupJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
// database may be dropped before replaying this log. just return
@@ -633,7 +633,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
* Replay job in FINISHED state.
* Should replay all changes in runRuningJob()
*/
- private void replayFinished(RollupJobV2 replayedJob) {
+ private void replayRunningJob(RollupJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db != null) {
OlapTable tbl = (OlapTable) db.getTable(tableId);
@@ -670,13 +670,13 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
RollupJobV2 replayedRollupJob = (RollupJobV2) replayedJob;
switch (replayedJob.jobState) {
case PENDING:
- replayPending(replayedRollupJob);
+ replayCreateJob(replayedRollupJob);
break;
case WAITING_TXN:
- replayWaitingTxn(replayedRollupJob);
+ replayPendingJob(replayedRollupJob);
break;
case FINISHED:
- replayFinished(replayedRollupJob);
+ replayRunningJob(replayedRollupJob);
break;
case CANCELLED:
replayCancelled(replayedRollupJob);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index ee568ce..665f376 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -665,7 +665,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
* Should replay all changes before this job's state transfer to PENDING.
* These changes should be same as changes in SchemaChangeHandler.createJob()
*/
- private void replayPending(SchemaChangeJobV2 replayedJob) {
+ private void replayCreateJob(SchemaChangeJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
// database may be dropped before replaying this log. just return
@@ -713,7 +713,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
* Replay job in WAITING_TXN state.
* Should replay all changes in runPendingJob()
*/
- private void replayWaitingTxn(SchemaChangeJobV2 replayedJob) {
+ private void replayPendingJob(SchemaChangeJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
// database may be dropped before replaying this log. just return
@@ -740,9 +740,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
/**
* Replay job in FINISHED state.
- * Should replay all changes in runRuningJob()
+ * Should replay all changes in runRunningJob()
*/
- private void replayFinished(SchemaChangeJobV2 replayedJob) {
+ private void replayRunningJob(SchemaChangeJobV2 replayedJob) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db != null) {
OlapTable tbl = (OlapTable) db.getTable(tableId);
@@ -777,13 +777,13 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
SchemaChangeJobV2 replayedSchemaChangeJob = (SchemaChangeJobV2) replayedJob;
switch (replayedJob.jobState) {
case PENDING:
- replayPending(replayedSchemaChangeJob);
+ replayCreateJob(replayedSchemaChangeJob);
break;
case WAITING_TXN:
- replayWaitingTxn(replayedSchemaChangeJob);
+ replayPendingJob(replayedSchemaChangeJob);
break;
case FINISHED:
- replayFinished(replayedSchemaChangeJob);
+ replayRunningJob(replayedSchemaChangeJob);
break;
case CANCELLED:
replayCancelled(replayedSchemaChangeJob);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
index a921758..92622a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java
@@ -46,13 +46,13 @@ public class MaterializedIndex extends MetaObject implements Writable, GsonPostP
SHADOW; // index in SHADOW state is visible to load process, but invisible to query
public boolean isVisible() {
- return this == IndexState.NORMAL || this == IndexState.SCHEMA_CHANGE;
+ return this == IndexState.NORMAL;
}
}
public enum IndexExtState {
ALL,
- VISIBLE, // index state in NORMAL and SCHEMA_CHANGE
+ VISIBLE, // index state in NORMAL
SHADOW // index state in SHADOW
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index bcc8202..0d9d4d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -17,6 +17,7 @@
package org.apache.doris.planner;
+import org.apache.commons.lang.StringUtils;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Catalog;
@@ -300,7 +301,8 @@ public class OlapTableSink extends DataSink {
Multimap<Long, Long> bePathsMap = tablet.getNormalReplicaBackendPathMap();
if (bePathsMap.keySet().size() < quorum) {
throw new UserException(InternalErrorCode.REPLICA_FEW_ERR,
- "tablet " + tablet.getId() + " has few replicas: " + bePathsMap.keySet().size());
+ "tablet " + tablet.getId() + " has few replicas: " + bePathsMap.keySet().size()
+ + ", alive backends: [" + StringUtils.join(bePathsMap.keySet(), ",") + "]");
}
locationParam.addToTablets(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet())));
allBePathsMap.putAll(bePathsMap);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index b47506e..f1d68ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -648,7 +648,7 @@ public class Backend implements Writable {
} else {
if (isAlive.compareAndSet(true, false)) {
isChanged = true;
- LOG.info("{} is dead,", this.toString());
+ LOG.warn("{} is dead,", this.toString());
}
heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 7dbc0d3..55795a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -204,7 +204,7 @@ public class GlobalTransactionMgr implements Writable {
long publishTimeoutMillis = timeoutMillis - stopWatch.getTime();
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(db.getId());
if (publishTimeoutMillis < 0) {
- // here commit transaction successfully cost too much time to cause publisTimeoutMillis is less than zero,
+ // here commit transaction successfully cost too much time to cause that publishTimeoutMillis is less than zero,
// so we just return false to indicate publish timeout
return false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org