You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by wa...@apache.org on 2021/03/12 06:24:51 UTC
[incubator-doris] branch master updated: (#5390)fix NPE when replay
colocate group (#5391)
This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 543ed46 (#5390)fix NPE when replay colocate group (#5391)
543ed46 is described below
commit 543ed46bc3d65bc9c050dd346ff72f5cad9e27ad
Author: wangbo <50...@qq.com>
AuthorDate: Fri Mar 12 14:24:34 2021 +0800
(#5390)fix NPE when replay colocate group (#5391)
* (#5390)fix NPE when replay colocate group
* remove table id from colocate group when duplicate create table
* remove tablet id when duplicate create table,just like ddlexception
* add ut
---
.../java/org/apache/doris/catalog/Catalog.java | 51 ++++++++++++++--------
.../apache/doris/catalog/ColocateTableIndex.java | 6 +++
.../java/org/apache/doris/catalog/Database.java | 8 +++-
.../org/apache/doris/catalog/InfoSchemaDb.java | 5 ++-
.../apache/doris/catalog/TabletInvertedIndex.java | 20 +++++++++
.../org/apache/doris/catalog/CreateTableTest.java | 34 +++++++++++++++
.../org/apache/doris/catalog/InfoSchemaDbTest.java | 2 +-
7 files changed, 102 insertions(+), 24 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 7f79320..cd54710 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -3791,22 +3791,35 @@ public class Catalog {
throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name());
}
- if (!db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists())) {
+ Pair<Boolean, Boolean> result = db.createTableWithLock(olapTable, false, stmt.isSetIfNotExists());
+ if (!result.first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exists");
}
- // we have added these index to memory, only need to persist here
- if (getColocateTableIndex().isColocateTable(tableId)) {
- GroupId groupId = getColocateTableIndex().getGroup(tableId);
- List<List<Long>> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId);
- ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq);
- editLog.logColocateAddTable(info);
- }
- LOG.info("successfully create table[{};{}]", tableName, tableId);
- // register or remove table from DynamicPartition after table created
- DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
- dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
- tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
+ if (result.second) {
+ if (getColocateTableIndex().isColocateTable(tableId)) {
+ // if this is a colocate join table, its table id is already added to colocate group
+ // so we should remove the tableId here
+ getColocateTableIndex().removeTable(tableId);
+ }
+ for (Long tabletId : tabletIdSet) {
+ Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
+ }
+ LOG.info("duplicate create table[{};{}], skip next steps", tableName, tableId);
+ } else {
+ // we have added these index to memory, only need to persist here
+ if (getColocateTableIndex().isColocateTable(tableId)) {
+ GroupId groupId = getColocateTableIndex().getGroup(tableId);
+ List<List<Long>> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId);
+ ColocatePersistInfo info = ColocatePersistInfo.createForAddTable(groupId, tableId, backendsPerBucketSeq);
+ editLog.logColocateAddTable(info);
+ }
+ LOG.info("successfully create table[{};{}]", tableName, tableId);
+ // register or remove table from DynamicPartition after table created
+ DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
+ dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
+ tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
+ }
} catch (DdlException e) {
for (Long tabletId : tabletIdSet) {
Catalog.getCurrentInvertedIndex().deleteTablet(tabletId);
@@ -3829,7 +3842,7 @@ public class Catalog {
long tableId = Catalog.getCurrentCatalog().getNextId();
MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties());
mysqlTable.setComment(stmt.getComment());
- if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists())) {
+ if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
@@ -3844,7 +3857,7 @@ public class Catalog {
long tableId = Catalog.getCurrentCatalog().getNextId();
OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties());
odbcTable.setComment(stmt.getComment());
- if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists())) {
+ if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
@@ -3875,7 +3888,7 @@ public class Catalog {
EsTable esTable = new EsTable(tableId, tableName, baseSchema, stmt.getProperties(), partitionInfo);
esTable.setComment(stmt.getComment());
- if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists())) {
+ if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
}
LOG.info("successfully create table{} with id {}", tableName, tableId);
@@ -3892,7 +3905,7 @@ public class Catalog {
brokerTable.setComment(stmt.getComment());
brokerTable.setBrokerProperties(stmt.getExtProperties());
- if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists())) {
+ if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
@@ -3906,7 +3919,7 @@ public class Catalog {
long tableId = getNextId();
HiveTable hiveTable = new HiveTable(tableId, tableName, columns, stmt.getProperties());
hiveTable.setComment(stmt.getComment());
- if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists())) {
+ if (!db.createTableWithLock(hiveTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, tableName, "table already exist");
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
@@ -5553,7 +5566,7 @@ public class Catalog {
throw new DdlException("failed to init view stmt", e);
}
- if (!db.createTableWithLock(newView, false, stmt.isSetIfNotExists())) {
+ if (!db.createTableWithLock(newView, false, stmt.isSetIfNotExists()).first) {
throw new DdlException("Failed to create view[" + tableName + "].");
}
LOG.info("successfully create view[" + tableName + "-" + newView.getId() + "]");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index 7c5fd98..dc6bf25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -700,4 +700,10 @@ public class ColocateTableIndex implements Writable {
writeUnlock();
}
}
+
+ // just for ut
+ public Map<Long, GroupId> getTable2Group() {
+ return table2Group;
+ }
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index 202c8eb..c1d6f97 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -281,13 +281,17 @@ public class Database extends MetaObject implements Writable {
checkReplicaQuota();
}
- public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
+ public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
boolean result = true;
+ // if a table is already exists, then edit log won't be executed
+ // some caller of this method may need to know this message
+ boolean isTableExist = false;
writeLock();
try {
String tableName = table.getName();
if (nameToTable.containsKey(tableName)) {
result = setIfNotExist;
+ isTableExist = true;
} else {
idToTable.put(table.getId(), table);
nameToTable.put(table.getName(), table);
@@ -301,7 +305,7 @@ public class Database extends MetaObject implements Writable {
Catalog.getCurrentCatalog().getEsRepository().registerTable((EsTable)table);
}
}
- return result;
+ return Pair.create(result, isTableExist);
} finally {
writeUnlock();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java
index e577d4f..78666f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java
@@ -18,6 +18,7 @@
package org.apache.doris.catalog;
import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.SystemIdGenerator;
import java.io.DataInput;
@@ -39,8 +40,8 @@ public class InfoSchemaDb extends Database {
}
@Override
- public boolean createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
- return false;
+ public Pair<Boolean, Boolean> createTableWithLock(Table table, boolean isReplay, boolean setIfNotExist) {
+ return Pair.create(false, false);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index edad1e0..3e424e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -695,5 +695,25 @@ public class TabletInvertedIndex {
this.beByReplicaCount = TreeMultimap.create(info.beByReplicaCount);
}
}
+
+ // just for ut
+ public Table<Long, Long, Replica> getReplicaMetaTable() {
+ return replicaMetaTable;
+ }
+
+ // just for ut
+ public Table<Long, Long, Replica> getBackingReplicaMetaTable() {
+ return backingReplicaMetaTable;
+ }
+
+ // just for ut
+ public Table<Long, Long, TabletMeta> getTabletMetaTable() {
+ return tabletMetaTable;
+ }
+
+ // just for ut
+ public Map<Long, TabletMeta> getTabletMetaMap() {
+ return tabletMetaMap;
+ }
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index dab5fbd..6bfdb87 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -32,6 +32,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
import java.util.UUID;
public class CreateTableTest {
@@ -63,6 +65,38 @@ public class CreateTableTest {
}
@Test
+ public void testDuplicateCreateTable() throws Exception{
+ // test
+ Catalog catalog = Catalog.getCurrentCatalog();
+ String sql = "create table if not exists test.tbl1\n" + "(k1 int, k2 int)\n" + "duplicate key(k1)\n"
+ + "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1','colocate_with'='test'); ";
+ createTable(sql);
+ Set<Long> tabletIdSetAfterCreateFirstTable = catalog.getTabletInvertedIndex().getReplicaMetaTable().rowKeySet();
+ Set<TabletMeta> tabletMetaSetBeforeCreateFirstTable = new HashSet<>();
+ catalog.getTabletInvertedIndex().getTabletMetaTable().values().forEach(tabletMeta -> {tabletMetaSetBeforeCreateFirstTable.add(tabletMeta);});
+ Set<Long> colocateTableIdBeforeCreateFirstTable = catalog.getColocateTableIndex().getTable2Group().keySet();
+ Assert.assertTrue(colocateTableIdBeforeCreateFirstTable.size() > 0);
+ Assert.assertTrue(tabletIdSetAfterCreateFirstTable.size() > 0);
+
+ createTable(sql);
+ // check whether tablet is cleared after duplicate create table
+ Set<Long> tabletIdSetAfterDuplicateCreateTable1 = catalog.getTabletInvertedIndex().getReplicaMetaTable().rowKeySet();
+ Set<Long> tabletIdSetAfterDuplicateCreateTable2 = catalog.getTabletInvertedIndex().getBackingReplicaMetaTable().columnKeySet();
+ Set<Long> tabletIdSetAfterDuplicateCreateTable3 = catalog.getTabletInvertedIndex().getTabletMetaMap().keySet();
+ Set<TabletMeta> tabletIdSetAfterDuplicateCreateTable4 = new HashSet<>();
+ catalog.getTabletInvertedIndex().getTabletMetaTable().values().forEach(tabletMeta -> {tabletIdSetAfterDuplicateCreateTable4.add(tabletMeta);});
+
+ Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable1));
+ Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable2));
+ Assert.assertTrue(tabletIdSetAfterCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable3));
+ Assert.assertTrue(tabletMetaSetBeforeCreateFirstTable.equals(tabletIdSetAfterDuplicateCreateTable4));
+
+ // check whether table id is cleared from colocate group after duplicate create table
+ Set<Long> colocateTableIdAfterCreateFirstTable = catalog.getColocateTableIndex().getTable2Group().keySet();
+ Assert.assertTrue(colocateTableIdBeforeCreateFirstTable.equals(colocateTableIdAfterCreateFirstTable));
+ }
+
+ @Test
public void testNormal() throws DdlException {
ExceptionChecker.expectThrowsNoException(
() -> createTable("create table test.tbl1\n" + "(k1 int, k2 int)\n" + "duplicate key(k1)\n"
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java
index 08a47a8..1cf7fdd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java
@@ -28,7 +28,7 @@ public class InfoSchemaDbTest {
Database db = new InfoSchemaDb();
Assert.assertFalse(db.createTable(null));
- Assert.assertFalse(db.createTableWithLock(null, false, false));
+ Assert.assertFalse(db.createTableWithLock(null, false, false).first);
db.dropTable("authors");
db.write(null);
Assert.assertNull(db.getTable("authors"));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org