You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/06 09:20:42 UTC

[incubator-doris] 03/11: [fix](catalog) fix bug that replica missing version cause query -214 error (#9266)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 05cf433aa6e528206ae68112ca7c9ccd5c575788
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue May 3 17:54:19 2022 +0800

    [fix](catalog) fix bug that replica missing version cause query -214 error (#9266)
    
    1. Fix bug described in #9267
        When report missing version replica, set last failed version to (replica version + 1)
    2. Skip non-exist partition when handling transactions.
---
 .../java/org/apache/doris/catalog/Catalog.java     |  5 +-
 .../java/org/apache/doris/catalog/Replica.java     |  2 +-
 .../org/apache/doris/master/ReportHandler.java     | 18 ++++---
 .../apache/doris/persist/BackendReplicasInfo.java  | 18 +++++--
 .../doris/transaction/DatabaseTransactionMgr.java  | 60 +++++++++-------------
 .../doris/transaction/GlobalTransactionMgr.java    |  4 +-
 .../doris/transaction/PublishVersionDaemon.java    |  6 +--
 .../doris/persist/BackendReplicaInfosTest.java     |  3 +-
 .../transaction/GlobalTransactionMgrTest.java      |  9 ++--
 9 files changed, 60 insertions(+), 65 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index f3e187065a..ccae1c3b5f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -6965,10 +6965,7 @@ public class Catalog {
                     replica.setBad(true);
                     break;
                 case MISSING_VERSION:
-                    // The absolute value is meaningless, as long as it is greater than 0.
-                    // This way, in other checking logic, if lastFailedVersion is found to be greater than 0,
-                    // it will be considered a version missing replica and will be handled accordingly.
-                    replica.setLastFailedVersion(1L);
+                    replica.updateLastFailedVersion(info.lastFailedVersion);
                     break;
                 default:
                     break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index a42cebdc8a..930305db98 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -309,7 +309,7 @@ public class Replica implements Writable {
 
         if (newVersion < this.version) {
             // This case means that replica meta version has been updated by ReportHandler before
-            // For example, the publish version daemon has already sent some publish verison tasks to one be to publish version 2, 3, 4, 5, 6,
+            // For example, the publish version daemon has already sent some publish version tasks to one be to publish version 2, 3, 4, 5, 6,
             // and the be finish all publish version tasks, the be's replica version is 6 now, but publish version daemon need to wait
             // for other be to finish most of publish version tasks to update replica version in fe.
             // At the moment, the replica version in fe is 4, when ReportHandler sync tablet, it find reported replica version in be is 6 and then
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index d0e7b9ae5b..05643aa7e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -729,8 +729,8 @@ public class ReportHandler extends Daemon {
             AgentTaskExecutor.submit(batchTask);
         }
 
-        LOG.info("delete {} tablet(s) from backend[{}]", deleteFromBackendCounter, backendId);
-        LOG.info("add {} replica(s) to meta. backend[{}]", addToMetaCounter, backendId);
+        LOG.info("delete {} tablet(s) and add {} replica(s) to meta from backend[{}]",
+                deleteFromBackendCounter, addToMetaCounter, backendId);
     }
 
     // replica is used and no version missing
@@ -854,11 +854,15 @@ public class ReportHandler extends Daemon {
                             }
 
                             if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) {
-                                // The absolute value is meaningless, as long as it is greater than 0.
-                                // This way, in other checking logic, if lastFailedVersion is found to be greater than 0,
-                                // it will be considered a version missing replica and will be handled accordingly.
-                                replica.setLastFailedVersion(1L);
-                                backendReplicasInfo.addMissingVersionReplica(tabletId);
+                                // If the origin last failed version is larger than 0, not change it.
+                                // Otherwise, we set last failed version to replica'version + 1.
+                                // Because last failed version should always larger than replica's version.
+                                long newLastFailedVersion = replica.getLastFailedVersion();
+                                if (newLastFailedVersion < 0) {
+                                    newLastFailedVersion = replica.getVersion() + 1;
+                                }
+                                replica.updateLastFailedVersion(newLastFailedVersion);
+                                backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion);
                                 break;
                             }
                         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java
index c382e1fdb5..80222f4618 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java
@@ -44,11 +44,11 @@ public class BackendReplicasInfo implements Writable {
     }
 
     public void addBadReplica(long tabletId) {
-        replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.BAD));
+        replicaReportInfos.add(new ReplicaReportInfo(tabletId, -1, ReportInfoType.BAD));
     }
 
-    public void addMissingVersionReplica(long tabletId) {
-        replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.MISSING_VERSION));
+    public void addMissingVersionReplica(long tabletId, long lastFailedVersion) {
+        replicaReportInfos.add(new ReplicaReportInfo(tabletId, lastFailedVersion, ReportInfoType.MISSING_VERSION));
     }
 
     public long getBackendId() {
@@ -84,9 +84,12 @@ public class BackendReplicasInfo implements Writable {
         public long tabletId;
         @SerializedName(value = "type")
         public ReportInfoType type;
+        @SerializedName(value = "lastFailedVersion")
+        public long lastFailedVersion;
 
-        public ReplicaReportInfo(long tabletId, ReportInfoType type) {
+        public ReplicaReportInfo(long tabletId, long lastFailedVersion, ReportInfoType type) {
             this.tabletId = tabletId;
+            this.lastFailedVersion = lastFailedVersion;
             this.type = type;
         }
 
@@ -98,7 +101,12 @@ public class BackendReplicasInfo implements Writable {
 
         public static ReplicaReportInfo read(DataInput in) throws IOException {
             String json = Text.readString(in);
-            return GsonUtils.GSON.fromJson(json, ReplicaReportInfo.class);
+            ReplicaReportInfo info = GsonUtils.GSON.fromJson(json, ReplicaReportInfo.class);
+            if (info.type == ReportInfoType.MISSING_VERSION && info.lastFailedVersion <= 0) {
+                // FIXME(cmy): Just for compatibility, should be remove in v1.2
+                info.lastFailedVersion = 1;
+            }
+            return info;
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 83585ffe32..9ce906fb79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -569,7 +569,7 @@ public class DatabaseTransactionMgr {
                                   TxnCommitAttachment txnCommitAttachment, Boolean is2PC)
             throws UserException {
         // check status
-        // the caller method already own db lock, we do not obtain db lock here
+        // the caller method already own tables' write lock
         Database db = catalog.getDbOrMetaException(dbId);
         TransactionState transactionState;
         readLock();
@@ -644,7 +644,7 @@ public class DatabaseTransactionMgr {
         LOG.info("transaction:[{}] successfully committed", transactionState);
     }
 
-    public boolean publishTransaction(Database db, long transactionId, long timeoutMillis) throws TransactionCommitFailedException {
+    public boolean waitForTransactionFinished(Database db, long transactionId, long timeoutMillis) throws TransactionCommitFailedException {
         TransactionState transactionState = null;
         readLock();
         try {
@@ -857,48 +857,24 @@ public class DatabaseTransactionMgr {
                         }
                     }
 
+                    // check success replica number for each tablet.
+                    // a success replica means:
+                    //  1. Not in errorReplicaIds: succeed in both commit and publish phase
+                    //  2. last failed version < 0: is a health replica before
+                    //  3. version catch up: not with a stale version
+                    // Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
                     for (MaterializedIndex index : allIndices) {
                         for (Tablet tablet : index.getTablets()) {
                             int healthReplicaNum = 0;
                             for (Replica replica : tablet.getReplicas()) {
-                                if (!errorReplicaIds.contains(replica.getId())
-                                        && replica.getLastFailedVersion() < 0) {
-                                    // this means the replica is a healthy replica,
-                                    // it is healthy in the past and does not have error in current load
+                                if (!errorReplicaIds.contains(replica.getId()) && replica.getLastFailedVersion() < 0) {
                                     if (replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
-                                        // during rollup, the rollup replica's last failed version < 0,
-                                        // it may be treated as a normal replica.
-                                        // the replica is not failed during commit or publish
-                                        // during upgrade, one replica's last version maybe invalid,
-                                        // has to compare version hash.
-
-                                        // Here we still update the replica's info even if we failed to publish
-                                        // this txn, for the following case:
-                                        // replica A,B,C is successfully committed, but only A is successfully
-                                        // published,
-                                        // B and C is crashed, now we need a Clone task to repair this tablet.
-                                        // So, here we update A's version info, so that clone task will clone
-                                        // the latest version of data.
-
-                                        replica.updateVersionInfo(partitionCommitInfo.getVersion(),
-                                                replica.getDataSize(), replica.getRowCount());
                                         ++healthReplicaNum;
-                                    } else {
-                                        // this means the replica has error in the past, but we did not observe it
-                                        // during upgrade, one job maybe in quorum finished state, for example, A,B,C 3 replica
-                                        // A,B 's version is 10, C's version is 10 but C' 10 is abnormal should be rollback
-                                        // then we will detect this and set C's last failed version to 10 and last success version to 11
-                                        // this logic has to be replayed in checkpoint thread
-                                        replica.updateVersionInfo(replica.getVersion(),
-                                                partition.getVisibleVersion(), 
-                                                partitionCommitInfo.getVersion());
-                                        LOG.warn("transaction state {} has error, the replica [{}] not appeared in error replica list "
-                                                + " and its version not equal to partition commit version or commit version - 1"
-                                                + " if its not a upgrade stage, its a fatal error. ", transactionState, replica);
                                     }
                                 } else if (replica.getVersion() >= partitionCommitInfo.getVersion()) {
                                     // the replica's version is larger than or equal to current transaction partition's version
                                     // the replica is normal, then remove it from error replica ids
+                                    // TODO(cmy): actually I have no idea why we need this check
                                     errorReplicaIds.remove(replica.getId());
                                     ++healthReplicaNum;
                                 }
@@ -1490,14 +1466,19 @@ public class DatabaseTransactionMgr {
             for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) {
                 long partitionId = partitionCommitInfo.getPartitionId();
                 Partition partition = table.getPartition(partitionId);
+                if (partition == null) {
+                    LOG.warn("partition {} of table {} does not exist when update catalog after committed. transaction: {}, db: {}",
+                            partitionId, tableId, transactionState.getTransactionId(), db.getId());
+                    continue;
+                }
                 List<MaterializedIndex> allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
                 for (MaterializedIndex index : allIndices) {
                     List<Tablet> tablets = index.getTablets();
                     for (Tablet tablet : tablets) {
                         for (Replica replica : tablet.getReplicas()) {
                             if (errorReplicaIds.contains(replica.getId())) {
-                                // should not use partition.getNextVersion and partition.getNextVersionHash because partition's next version hash is generated locally
-                                // should get from transaction state
+                                // TODO(cmy): do we need to update last failed version here?
+                                // because in updateCatalogAfterVisible, it will be updated again.
                                 replica.updateLastFailedVersion(partitionCommitInfo.getVersion());
                             }
                         }
@@ -1522,6 +1503,11 @@ public class DatabaseTransactionMgr {
                 long partitionId = partitionCommitInfo.getPartitionId();
                 long newCommitVersion = partitionCommitInfo.getVersion();
                 Partition partition = table.getPartition(partitionId);
+                if (partition == null) {
+                    LOG.warn("partition {} in table {} does not exist when update catalog after visible. transaction: {}, db: {}",
+                            partitionId, tableId, transactionState.getTransactionId(), db.getId());
+                    continue;
+                }
                 List<MaterializedIndex> allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
                 for (MaterializedIndex index : allIndices) {
                     for (Tablet tablet : index.getTablets()) {
@@ -1531,7 +1517,7 @@ public class DatabaseTransactionMgr {
                             long lastSuccessVersion = replica.getLastSuccessVersion();
                             if (!errorReplicaIds.contains(replica.getId())) {
                                 if (replica.getLastFailedVersion() > 0) {
-                                    // if the replica is a failed replica, then not changing version and version hash
+                                    // if the replica is a failed replica, then not changing version
                                     newVersion = replica.getVersion();
                                 } else if (!replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
                                     // this means the replica has error in the past, but we did not observe it
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index b51cc9c0d9..9161eb525e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -211,7 +211,7 @@ public class GlobalTransactionMgr implements Writable {
      * @throws UserException
      * @throws TransactionCommitFailedException
      * @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time
-     * @note callers should get db.write lock before call this api
+     * @note callers should get all tables' write locks before call this api
      */
     public void commitTransaction(long dbId, List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos,
                                   TxnCommitAttachment txnCommitAttachment)
@@ -263,7 +263,7 @@ public class GlobalTransactionMgr implements Writable {
             // so we just return false to indicate publish timeout
             return false;
         }
-        return dbTransactionMgr.publishTransaction(db, transactionId, publishTimeoutMillis);
+        return dbTransactionMgr.waitForTransactionFinished(db, transactionId, publishTimeoutMillis);
     }
 
     public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
index 99da777962..7b592742f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java
@@ -79,10 +79,7 @@ public class PublishVersionDaemon extends MasterDaemon {
             return;
         }
 
-        // TODO yiguolei: could publish transaction state according to multi-tenant cluster info
-        // but should do more work. for example, if a table is migrate from one cluster to another cluster
-        // should publish to two clusters.
-        // attention here, we publish transaction state to all backends including dead backend, if not publish to dead backend
+        // ATTN, we publish transaction state to all backends including dead backend, if not publish to dead backend
         // then transaction manager will treat it as success
         List<Long> allBackends = Catalog.getCurrentSystemInfo().getBackendIds(false);
         if (allBackends.isEmpty()) {
@@ -198,7 +195,6 @@ public class PublishVersionDaemon extends MasterDaemon {
                             continue;
                         }
 
-
                         for (long tableId : transactionState.getTableIdList()) {
                             Table table = db.getTableNullable(tableId);
                             if (table == null || table.getType() != Table.TableType.OLAP) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java
index 54ceaf44b1..e5a29a9860 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java
@@ -49,7 +49,7 @@ public class BackendReplicaInfosTest {
 
         BackendReplicasInfo info = new BackendReplicasInfo(beId);
         info.addBadReplica(tabletId1);
-        info.addMissingVersionReplica(tabletId2);
+        info.addMissingVersionReplica(tabletId2, 11);
         checkInfo(info);
         info.write(dos);
         dos.flush();
@@ -73,6 +73,7 @@ public class BackendReplicaInfosTest {
                 Assert.assertEquals(BackendReplicasInfo.ReportInfoType.BAD, reportInfo.type);
             } else if (reportInfo.tabletId == tabletId2) {
                 Assert.assertEquals(BackendReplicasInfo.ReportInfoType.MISSING_VERSION, reportInfo.type);
+                Assert.assertEquals(11, reportInfo.lastFailedVersion);
             } else {
                 Assert.fail("unknown tablet id: " + reportInfo.tabletId);
             }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 3816f73d72..abdb01f940 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -67,12 +67,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import mockit.Injectable;
-import mockit.Mocked;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import mockit.Injectable;
+import mockit.Mocked;
+
 public class GlobalTransactionMgrTest {
 
     private static FakeEditLog fakeEditLog;
@@ -526,7 +527,9 @@ public class GlobalTransactionMgrTest {
         Replica replica1 = tablet.getReplicaById(CatalogTestUtil.testReplicaId1);
         Replica replica2 = tablet.getReplicaById(CatalogTestUtil.testReplicaId2);
         Replica replica3 = tablet.getReplicaById(CatalogTestUtil.testReplicaId3);
-        assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getVersion());
+        // because after calling `finishTransaction`, the txn state is COMMITTED, not VISIBLE,
+        // so all replicas' version are not changed.
+        assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion());
         assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion());
         assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion());
         assertEquals(-1, replica1.getLastFailedVersion());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org