You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/06 09:20:42 UTC
[incubator-doris] 03/11: [fix](catalog) fix bug that replica missing version cause query -214 error (#9266)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 05cf433aa6e528206ae68112ca7c9ccd5c575788
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue May 3 17:54:19 2022 +0800
[fix](catalog) fix bug that replica missing version cause query -214 error (#9266)
1. Fix bug described in #9267
When report missing version replica, set last failed version to (replica version + 1)
2. Skip non-exist partition when handling transactions.
---
.../java/org/apache/doris/catalog/Catalog.java | 5 +-
.../java/org/apache/doris/catalog/Replica.java | 2 +-
.../org/apache/doris/master/ReportHandler.java | 18 ++++---
.../apache/doris/persist/BackendReplicasInfo.java | 18 +++++--
.../doris/transaction/DatabaseTransactionMgr.java | 60 +++++++++-------------
.../doris/transaction/GlobalTransactionMgr.java | 4 +-
.../doris/transaction/PublishVersionDaemon.java | 6 +--
.../doris/persist/BackendReplicaInfosTest.java | 3 +-
.../transaction/GlobalTransactionMgrTest.java | 9 ++--
9 files changed, 60 insertions(+), 65 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index f3e187065a..ccae1c3b5f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -6965,10 +6965,7 @@ public class Catalog {
replica.setBad(true);
break;
case MISSING_VERSION:
- // The absolute value is meaningless, as long as it is greater than 0.
- // This way, in other checking logic, if lastFailedVersion is found to be greater than 0,
- // it will be considered a version missing replica and will be handled accordingly.
- replica.setLastFailedVersion(1L);
+ replica.updateLastFailedVersion(info.lastFailedVersion);
break;
default:
break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index a42cebdc8a..930305db98 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -309,7 +309,7 @@ public class Replica implements Writable {
if (newVersion < this.version) {
// This case means that replica meta version has been updated by ReportHandler before
- // For example, the publish version daemon has already sent some publish verison tasks to one be to publish version 2, 3, 4, 5, 6,
+ // For example, the publish version daemon has already sent some publish version tasks to one be to publish version 2, 3, 4, 5, 6,
// and the be finish all publish version tasks, the be's replica version is 6 now, but publish version daemon need to wait
// for other be to finish most of publish version tasks to update replica version in fe.
// At the moment, the replica version in fe is 4, when ReportHandler sync tablet, it find reported replica version in be is 6 and then
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index d0e7b9ae5b..05643aa7e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -729,8 +729,8 @@ public class ReportHandler extends Daemon {
AgentTaskExecutor.submit(batchTask);
}
- LOG.info("delete {} tablet(s) from backend[{}]", deleteFromBackendCounter, backendId);
- LOG.info("add {} replica(s) to meta. backend[{}]", addToMetaCounter, backendId);
+ LOG.info("delete {} tablet(s) and add {} replica(s) to meta from backend[{}]",
+ deleteFromBackendCounter, addToMetaCounter, backendId);
}
// replica is used and no version missing
@@ -854,11 +854,15 @@ public class ReportHandler extends Daemon {
}
if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) {
- // The absolute value is meaningless, as long as it is greater than 0.
- // This way, in other checking logic, if lastFailedVersion is found to be greater than 0,
- // it will be considered a version missing replica and will be handled accordingly.
- replica.setLastFailedVersion(1L);
- backendReplicasInfo.addMissingVersionReplica(tabletId);
+ // If the origin last failed version is larger than 0, not change it.
+ // Otherwise, we set last failed version to replica'version + 1.
+ // Because last failed version should always larger than replica's version.
+ long newLastFailedVersion = replica.getLastFailedVersion();
+ if (newLastFailedVersion < 0) {
+ newLastFailedVersion = replica.getVersion() + 1;
+ }
+ replica.updateLastFailedVersion(newLastFailedVersion);
+ backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion);
break;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java
index c382e1fdb5..80222f4618 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java
@@ -44,11 +44,11 @@ public class BackendReplicasInfo implements Writable {
}
public void addBadReplica(long tabletId) {
- replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.BAD));
+ replicaReportInfos.add(new ReplicaReportInfo(tabletId, -1, ReportInfoType.BAD));
}
- public void addMissingVersionReplica(long tabletId) {
- replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.MISSING_VERSION));
+ public void addMissingVersionReplica(long tabletId, long lastFailedVersion) {
+ replicaReportInfos.add(new ReplicaReportInfo(tabletId, lastFailedVersion, ReportInfoType.MISSING_VERSION));
}
public long getBackendId() {
@@ -84,9 +84,12 @@ public class BackendReplicasInfo implements Writable {
public long tabletId;
@SerializedName(value = "type")
public ReportInfoType type;
+ @SerializedName(value = "lastFailedVersion")
+ public long lastFailedVersion;
- public ReplicaReportInfo(long tabletId, ReportInfoType type) {
+ public ReplicaReportInfo(long tabletId, long lastFailedVersion, ReportInfoType type) {
this.tabletId = tabletId;
+ this.lastFailedVersion = lastFailedVersion;
this.type = type;
}
@@ -98,7 +101,12 @@ public class BackendReplicasInfo implements Writable {
public static ReplicaReportInfo read(DataInput in) throws IOException {
String json = Text.readString(in);
- return GsonUtils.GSON.fromJson(json, ReplicaReportInfo.class);
+ ReplicaReportInfo info = GsonUtils.GSON.fromJson(json, ReplicaReportInfo.class);
+ if (info.type == ReportInfoType.MISSING_VERSION && info.lastFailedVersion <= 0) {
+ // FIXME(cmy): Just for compatibility, should be remove in v1.2
+ info.lastFailedVersion = 1;
+ }
+ return info;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 83585ffe32..9ce906fb79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -569,7 +569,7 @@ public class DatabaseTransactionMgr {
TxnCommitAttachment txnCommitAttachment, Boolean is2PC)
throws UserException {
// check status
- // the caller method already own db lock, we do not obtain db lock here
+ // the caller method already own tables' write lock
Database db = catalog.getDbOrMetaException(dbId);
TransactionState transactionState;
readLock();
@@ -644,7 +644,7 @@ public class DatabaseTransactionMgr {
LOG.info("transaction:[{}] successfully committed", transactionState);
}
- public boolean publishTransaction(Database db, long transactionId, long timeoutMillis) throws TransactionCommitFailedException {
+ public boolean waitForTransactionFinished(Database db, long transactionId, long timeoutMillis) throws TransactionCommitFailedException {
TransactionState transactionState = null;
readLock();
try {
@@ -857,48 +857,24 @@ public class DatabaseTransactionMgr {
}
}
+ // check success replica number for each tablet.
+ // a success replica means:
+ // 1. Not in errorReplicaIds: succeed in both commit and publish phase
+ // 2. last failed version < 0: is a health replica before
+ // 3. version catch up: not with a stale version
+ // Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
int healthReplicaNum = 0;
for (Replica replica : tablet.getReplicas()) {
- if (!errorReplicaIds.contains(replica.getId())
- && replica.getLastFailedVersion() < 0) {
- // this means the replica is a healthy replica,
- // it is healthy in the past and does not have error in current load
+ if (!errorReplicaIds.contains(replica.getId()) && replica.getLastFailedVersion() < 0) {
if (replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
- // during rollup, the rollup replica's last failed version < 0,
- // it may be treated as a normal replica.
- // the replica is not failed during commit or publish
- // during upgrade, one replica's last version maybe invalid,
- // has to compare version hash.
-
- // Here we still update the replica's info even if we failed to publish
- // this txn, for the following case:
- // replica A,B,C is successfully committed, but only A is successfully
- // published,
- // B and C is crashed, now we need a Clone task to repair this tablet.
- // So, here we update A's version info, so that clone task will clone
- // the latest version of data.
-
- replica.updateVersionInfo(partitionCommitInfo.getVersion(),
- replica.getDataSize(), replica.getRowCount());
++healthReplicaNum;
- } else {
- // this means the replica has error in the past, but we did not observe it
- // during upgrade, one job maybe in quorum finished state, for example, A,B,C 3 replica
- // A,B 's version is 10, C's version is 10 but C' 10 is abnormal should be rollback
- // then we will detect this and set C's last failed version to 10 and last success version to 11
- // this logic has to be replayed in checkpoint thread
- replica.updateVersionInfo(replica.getVersion(),
- partition.getVisibleVersion(),
- partitionCommitInfo.getVersion());
- LOG.warn("transaction state {} has error, the replica [{}] not appeared in error replica list "
- + " and its version not equal to partition commit version or commit version - 1"
- + " if its not a upgrade stage, its a fatal error. ", transactionState, replica);
}
} else if (replica.getVersion() >= partitionCommitInfo.getVersion()) {
// the replica's version is larger than or equal to current transaction partition's version
// the replica is normal, then remove it from error replica ids
+ // TODO(cmy): actually I have no idea why we need this check
errorReplicaIds.remove(replica.getId());
++healthReplicaNum;
}
@@ -1490,14 +1466,19 @@ public class DatabaseTransactionMgr {
for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) {
long partitionId = partitionCommitInfo.getPartitionId();
Partition partition = table.getPartition(partitionId);
+ if (partition == null) {
+ LOG.warn("partition {} of table {} does not exist when update catalog after committed. transaction: {}, db: {}",
+ partitionId, tableId, transactionState.getTransactionId(), db.getId());
+ continue;
+ }
List<MaterializedIndex> allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
for (MaterializedIndex index : allIndices) {
List<Tablet> tablets = index.getTablets();
for (Tablet tablet : tablets) {
for (Replica replica : tablet.getReplicas()) {
if (errorReplicaIds.contains(replica.getId())) {
- // should not use partition.getNextVersion and partition.getNextVersionHash because partition's next version hash is generated locally
- // should get from transaction state
+ // TODO(cmy): do we need to update last failed version here?
+ // because in updateCatalogAfterVisible, it will be updated again.
replica.updateLastFailedVersion(partitionCommitInfo.getVersion());
}
}
@@ -1522,6 +1503,11 @@ public class DatabaseTransactionMgr {
long partitionId = partitionCommitInfo.getPartitionId();
long newCommitVersion = partitionCommitInfo.getVersion();
Partition partition = table.getPartition(partitionId);
+ if (partition == null) {
+ LOG.warn("partition {} in table {} does not exist when update catalog after visible. transaction: {}, db: {}",
+ partitionId, tableId, transactionState.getTransactionId(), db.getId());
+ continue;
+ }
List<MaterializedIndex> allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
@@ -1531,7 +1517,7 @@ public class DatabaseTransactionMgr {
long lastSuccessVersion = replica.getLastSuccessVersion();
if (!errorReplicaIds.contains(replica.getId())) {
if (replica.getLastFailedVersion() > 0) {
- // if the replica is a failed replica, then not changing version and version hash
+ // if the replica is a failed replica, then not changing version
newVersion = replica.getVersion();
} else if (!replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
// this means the replica has error in the past, but we did not observe it
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index b51cc9c0d9..9161eb525e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -211,7 +211,7 @@ public class GlobalTransactionMgr implements Writable {
* @throws UserException
* @throws TransactionCommitFailedException
* @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time
- * @note callers should get db.write lock before call this api
+ * @note callers should get all tables' write locks before call this api
*/
public void commitTransaction(long dbId, List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos,
TxnCommitAttachment txnCommitAttachment)
@@ -263,7 +263,7 @@ public class GlobalTransactionMgr implements Writable {
// so we just return false to indicate publish timeout
return false;
}
- return dbTransactionMgr.publishTransaction(db, transactionId, publishTimeoutMillis);
+ return dbTransactionMgr.waitForTransactionFinished(db, transactionId, publishTimeoutMillis);
}
public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 99da777962..7b592742f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -79,10 +79,7 @@ public class PublishVersionDaemon extends MasterDaemon {
return;
}
- // TODO yiguolei: could publish transaction state according to multi-tenant cluster info
- // but should do more work. for example, if a table is migrate from one cluster to another cluster
- // should publish to two clusters.
- // attention here, we publish transaction state to all backends including dead backend, if not publish to dead backend
+ // ATTN, we publish transaction state to all backends including dead backend, if not publish to dead backend
// then transaction manager will treat it as success
List<Long> allBackends = Catalog.getCurrentSystemInfo().getBackendIds(false);
if (allBackends.isEmpty()) {
@@ -198,7 +195,6 @@ public class PublishVersionDaemon extends MasterDaemon {
continue;
}
-
for (long tableId : transactionState.getTableIdList()) {
Table table = db.getTableNullable(tableId);
if (table == null || table.getType() != Table.TableType.OLAP) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java
index 54ceaf44b1..e5a29a9860 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java
@@ -49,7 +49,7 @@ public class BackendReplicaInfosTest {
BackendReplicasInfo info = new BackendReplicasInfo(beId);
info.addBadReplica(tabletId1);
- info.addMissingVersionReplica(tabletId2);
+ info.addMissingVersionReplica(tabletId2, 11);
checkInfo(info);
info.write(dos);
dos.flush();
@@ -73,6 +73,7 @@ public class BackendReplicaInfosTest {
Assert.assertEquals(BackendReplicasInfo.ReportInfoType.BAD, reportInfo.type);
} else if (reportInfo.tabletId == tabletId2) {
Assert.assertEquals(BackendReplicasInfo.ReportInfoType.MISSING_VERSION, reportInfo.type);
+ Assert.assertEquals(11, reportInfo.lastFailedVersion);
} else {
Assert.fail("unknown tablet id: " + reportInfo.tabletId);
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 3816f73d72..abdb01f940 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -67,12 +67,13 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import mockit.Injectable;
-import mockit.Mocked;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import mockit.Injectable;
+import mockit.Mocked;
+
public class GlobalTransactionMgrTest {
private static FakeEditLog fakeEditLog;
@@ -526,7 +527,9 @@ public class GlobalTransactionMgrTest {
Replica replica1 = tablet.getReplicaById(CatalogTestUtil.testReplicaId1);
Replica replica2 = tablet.getReplicaById(CatalogTestUtil.testReplicaId2);
Replica replica3 = tablet.getReplicaById(CatalogTestUtil.testReplicaId3);
- assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getVersion());
+ // because after calling `finishTransaction`, the txn state is COMMITTED, not VISIBLE,
+ // so all replicas' version are not changed.
+ assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion());
assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion());
assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion());
assertEquals(-1, replica1.getLastFailedVersion());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org