You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by wa...@apache.org on 2021/03/12 06:24:51 UTC

[incubator-doris] branch master updated: (#5390)fix NPE when replay colocate group (#5391)

This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 543ed46  (#5390)fix NPE when replay colocate group (#5391)
543ed46 is described below

commit 543ed46bc3d65bc9c050dd346ff72f5cad9e27ad
Author: wangbo <50...@qq.com>
AuthorDate: Fri Mar 12 14:24:34 2021 +0800

    (#5390)fix NPE when replay colocate group (#5391)
    
    * (#5390)fix NPE when replay colocate group
    
    * remove table id from colocate group when duplicate create table
    
    * remove tablet id when duplicate create table,just like ddlexception
    
    * add ut
---
 .../java/org/apache/doris/catalog/Catalog.java     | 51 ++++++++++++++--------
 .../apache/doris/catalog/ColocateTableIndex.java   |  6 +++
 .../java/org/apache/doris/catalog/Database.java    |  8 +++-
 .../org/apache/doris/catalog/InfoSchemaDb.java     |  5 ++-
 .../apache/doris/catalog/TabletInvertedIndex.java  | 20 +++++++++
 .../org/apache/doris/catalog/CreateTableTest.java  | 34 +++++++++++++++
 .../org/apache/doris/catalog/InfoSchemaDbTest.java |  2 +-
 7 files changed, 102 insertions(+), 24 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 7f79320..cd54710 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -3791,22 +3791,35 @@ public class Catalog {
                 throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name());
             }
 
-            if (!db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists())) {
+            Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists());
+            if (!result.first) {
                 ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exists");
             }
 
-            // we have added these index to memory, only need to persist here
-            if (getColocateTableIndex().isColocateTable(tableId)) {
-                GroupId groupId = getColocateTableIndex().getGroup(tableId);
-                List<List<Long>> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId);
-                ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq);
-                editLog.logColocateAddTable(info);
-            }
-            LOG.info("successfully create table[{};{}]", tableName, tableId);
-            // register or remove table from DynamicPartition after table created
-            DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
-            dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
-                    tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
+            if (result.second) {
+                if (getColocateTableIndex().isColocateTable(tableId)) {
+                    // if this is a colocate join table, its table id is already added to colocate group
+                    // so we should remove the tableId here
+                    getColocateTableIndex().removeTable(tableId);
+                }
+                for (Long tabletId : tabletIdSet) {
+                    Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+                }
+                LOG.info("duplicate create table[{};{}], skip next steps", tableName, tableId);
+            } else {
+                // we have added these index to memory, only need to persist here
+                if (getColocateTableIndex().isColocateTable(tableId)) {
+                    GroupId groupId = getColocateTableIndex().getGroup(tableId);
+                    List<List<Long>> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId);
+                    ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq);
+                    editLog.logColocateAddTable(info);
+                }
+                LOG.info("successfully create table[{};{}]", tableName, tableId);
+                // register or remove table from DynamicPartition after table created
+                DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
+                dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
+                        tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
+            }
         } catch (DdlException e) {
             for (Long tabletId : tabletIdSet) {
                 Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
@@ -3829,7 +3842,7 @@ public class Catalog {
         long tableId = Catalog.getCurrentCatalog().getNextId();
         MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties());
         mysqlTable.setComment(stmt.getComment());
-        if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists())) {
+        if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) {
             ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
         }
         LOG.info("successfully create table[{}-{}]", tableName, tableId);
@@ -3844,7 +3857,7 @@ public class Catalog {
         long tableId = Catalog.getCurrentCatalog().getNextId();
         OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties());
         odbcTable.setComment(stmt.getComment());
-        if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists())) {
+        if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) {
             ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
         }
         LOG.info("successfully create table[{}-{}]", tableName, tableId);
@@ -3875,7 +3888,7 @@ public class Catalog {
         EsTable esTable = new EsTable(tableId, tableName, baseSchema, stmt.getProperties(), partitionInfo);
         esTable.setComment(stmt.getComment());
 
-        if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists())) {
+        if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) {
             ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
         }
         LOG.info("successfully create table{} with id {}", tableName, tableId);
@@ -3892,7 +3905,7 @@ public class Catalog {
         brokerTable.setComment(stmt.getComment());
         brokerTable.setBrokerProperties(stmt.getExtProperties());
 
-        if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists())) {
+        if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) {
             ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
         }
         LOG.info("successfully create table[{}-{}]", tableName, tableId);
@@ -3906,7 +3919,7 @@ public class Catalog {
         long tableId = getNextId();
         HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties());
         hiveTable.setComment(stmt.getComment());
-        if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists())) {
+        if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) {
             ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
         }
         LOG.info("successfully create table[{}-{}]", tableName, tableId);
@@ -5553,7 +5566,7 @@ public class Catalog {
             throw new DdlException("failed to init view stmt", e);
         }
       
-        if (!db.createTableWithLock(newView, false, stmt.isSetIfNotExists())) {
+        if (!db.createTableWithLock(newView, false, stmt.isSetIfNotExists()).first) {
             throw new DdlException("Failed to create view[" + tableName + "].");
         }
         LOG.info("successfully create view[" + tableName + "-" + newView.getId() + "]");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index 7c5fd98..dc6bf25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -700,4 +700,10 @@ public class ColocateTableIndex implements Writable {
             writeUnlock();
         }
     }
+
+    // just for ut
+    public Map<Long, GroupId> getTable2Group() {
+        return table2Group;
+    }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index 202c8eb..c1d6f97 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -281,13 +281,17 @@ public class Database extends MetaObject implements Writable {
         checkReplicaQuota();
     }
 
-    public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
+    public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
         boolean result = true;
+        // if a table is already exists, then edit log won't be executed
+        // some caller of this method may need to know this message
+        boolean isTableExist = false;
         writeLock();
         try {
             String tableName = table.getName();
             if (nameToTable.containsKey(tableName)) {
                 result = setIfNotExist;
+                isTableExist = true;
             } else {
                 idToTable.put(table.getId(), table);
                 nameToTable.put(table.getName(), table);
@@ -301,7 +305,7 @@ public class Database extends MetaObject implements Writable {
                     Catalog.getCurrentCatalog().getEsRepository().registerTable((EsTable)table);
                 }
             }
-            return result;
+            return Pair.create(result, isTableExist);
         } finally {
             writeUnlock();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java
index e577d4f..78666f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java
@@ -18,6 +18,7 @@
 package org.apache.doris.catalog;
 
 import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.SystemIdGenerator;
 
 import java.io.DataInput;
@@ -39,8 +40,8 @@ public class InfoSchemaDb extends Database {
     }
 
     @Override
-    public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
-        return false;
+    public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
+        return Pair.create(false, false);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index edad1e0..3e424e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -695,5 +695,25 @@ public class TabletInvertedIndex {
             this.beByReplicaCount = TreeMultimap.create(info.beByReplicaCount);
         }
     }
+
+    // just for ut
+    public Table<Long, Long, Replica> getReplicaMetaTable() {
+        return replicaMetaTable;
+    }
+
+    // just for ut
+    public Table<Long, Long, Replica> getBackingReplicaMetaTable() {
+        return backingReplicaMetaTable;
+    }
+
+    // just for ut
+    public Table<Long, Long, TabletMeta> getTabletMetaTable() {
+        return tabletMetaTable;
+    }
+
+    // just for ut
+    public Map<Long, TabletMeta> getTabletMetaMap() {
+        return tabletMetaMap;
+    }
 }
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index dab5fbd..6bfdb87 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -32,6 +32,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 
 public class CreateTableTest {
@@ -63,6 +65,38 @@ public class CreateTableTest {
     }
 
     @Test
+    public void testDuplicateCreateTable() throws Exception{
+        // test
+        Catalog catalog = Catalog.getCurrentCatalog();
+        String sql = "create table if not exists test.tbl1\n" + "(k1 int, k2 int)\n" + "duplicate key(k1)\n"
+                + "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1','colocate_with'='test'); ";
+        createTable(sql);
+        Set<Long> tabletIdSetAfterCreateFirstTable = catalog.getTabletInvertedIndex().getReplicaMetaTable().rowKeySet();
+        Set<TabletMeta> tabletMetaSetBeforeCreateFirstTable = new HashSet<>();
+        catalog.getTabletInvertedIndex().getTabletMetaTable().values().forEach(tabletMeta -> {tabletMetaSetBeforeCreateFirstTable.add(tabletMeta);});
+        Set<Long> colocateTableIdBeforeCreateFirstTable = catalog.getColocateTableIndex().getTable2Group().keySet();
+        Assert.assertTrue(colocateTableIdBeforeCreateFirstTable.size() > 0);
+        Assert.assertTrue(tabletIdSetAfterCreateFirstTable.size() > 0);
+
+        createTable(sql);
+        // check whether tablet is cleared after duplicate create table
+        Set<Long> tabletIdSetAfterDuplicateCreateTable1 = catalog.getTabletInvertedIndex().getReplicaMetaTable().rowKeySet();
+        Set<Long> tabletIdSetAfterDuplicateCreateTable2 = catalog.getTabletInvertedIndex().getBackingReplicaMetaTable().columnKeySet();
+        Set<Long> tabletIdSetAfterDuplicateCreateTable3 = catalog.getTabletInvertedIndex().getTabletMetaMap().keySet();
+        Set<TabletMeta> tabletIdSetAfterDuplicateCreateTable4 = new HashSet<>();
+        catalog.getTabletInvertedIndex().getTabletMetaTable().values().forEach(tabletMeta -> {tabletIdSetAfterDuplicateCreateTable4.add(tabletMeta);});
+
+        Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable1));
+        Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable2));
+        Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable3));
+        Assert.assertTrue(tabletMetaSetBeforeCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable4));
+
+        // check whether table id is cleared from colocate group after duplicate create table
+        Set<Long> colocateTableIdAfterCreateFirstTable = catalog.getColocateTableIndex().getTable2Group().keySet();
+        Assert.assertTrue(colocateTableIdBeforeCreateFirstTable.equals(colocateTableIdAfterCreateFirstTable));
+    }
+
+    @Test
     public void testNormal() throws DdlException {
         ExceptionChecker.expectThrowsNoException(
                 () -> createTable("create table test.tbl1\n" + "(k1 int, k2 int)\n" + "duplicate key(k1)\n"
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java
index 08a47a8..1cf7fdd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java
@@ -28,7 +28,7 @@ public class InfoSchemaDbTest {
         Database db = new InfoSchemaDb();
 
         Assert.assertFalse(db.createTable(null));
-        Assert.assertFalse(db.createTableWithLock(null, false, false));
+        Assert.assertFalse(db.createTableWithLock(null, false, false).first);
         db.dropTable("authors");
         db.write(null);
         Assert.assertNull(db.getTable("authors"));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org