You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/08 12:38:14 UTC
[doris] branch master updated: [improvement](multi-catalog)Support invalid/not invalid option for refresh catalog and db. (#14922)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1fcf6475cf [improvement](multi-catalog)Support invalid/not invalid option for refresh catalog and db. (#14922)
1fcf6475cf is described below
commit 1fcf6475cf224d04b9057563e8beb23b065be9d1
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Thu Dec 8 20:38:07 2022 +0800
[improvement](multi-catalog)Support invalid/not invalid option for refresh catalog and db. (#14922)
Current refresh catalog/db operation always invalid all the related cache. In some cases, it is not necessary,
for example, create a new db in external data source. This pr is to support refresh without invalidate cache.
refresh catalog hive properties("invalid_cache" = "false");
refresh database hive.db1 properties("invalid_cache" = "false");
---
fe/fe-core/src/main/cup/sql_parser.cup | 12 ++++++------
.../org/apache/doris/analysis/RefreshCatalogStmt.java | 15 ++++++++++++++-
.../java/org/apache/doris/analysis/RefreshDbStmt.java | 18 ++++++++++++++++--
.../java/org/apache/doris/catalog/RefreshManager.java | 7 ++++---
.../doris/catalog/external/ExternalDatabase.java | 8 ++++++--
.../org/apache/doris/datasource/CatalogFactory.java | 1 +
.../java/org/apache/doris/datasource/CatalogLog.java | 3 +++
.../java/org/apache/doris/datasource/CatalogMgr.java | 8 ++++----
.../org/apache/doris/datasource/EsExternalCatalog.java | 2 +-
.../org/apache/doris/datasource/ExternalCatalog.java | 12 ++++++++++--
.../org/apache/doris/datasource/ExternalObjectLog.java | 3 +++
.../apache/doris/datasource/HMSExternalCatalog.java | 2 +-
.../apache/doris/datasource/JdbcExternalCatalog.java | 2 +-
13 files changed, 70 insertions(+), 23 deletions(-)
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index ee07447d4c..8fb5b854da 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -1137,13 +1137,13 @@ refresh_stmt ::=
{:
RESULT = new RefreshTableStmt(tbl);
:}
- | KW_REFRESH KW_DATABASE ident:db
+ | KW_REFRESH KW_DATABASE ident:db opt_properties:properties
{:
- RESULT = new RefreshDbStmt(db);
+ RESULT = new RefreshDbStmt(db, properties);
:}
- | KW_REFRESH KW_DATABASE ident:ctl DOT ident:db
+ | KW_REFRESH KW_DATABASE ident:ctl DOT ident:db opt_properties:properties
{:
- RESULT = new RefreshDbStmt(ctl, db);
+ RESULT = new RefreshDbStmt(ctl, db, properties);
:}
| KW_REFRESH KW_MATERIALIZED KW_VIEW table_name:mv
{:
@@ -1153,9 +1153,9 @@ refresh_stmt ::=
{:
RESULT = new RefreshMaterializedViewStmt(mv, MVRefreshInfo.RefreshMethod.COMPLETE);
:}
- | KW_REFRESH KW_CATALOG ident:catalogName
+ | KW_REFRESH KW_CATALOG ident:catalogName opt_properties:properties
{:
- RESULT = new RefreshCatalogStmt(catalogName);
+ RESULT = new RefreshCatalogStmt(catalogName, properties);
:}
;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java
index 481ebf3b9f..e054dca050 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshCatalogStmt.java
@@ -27,22 +27,32 @@ import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
+import java.util.Map;
+
/**
* RefreshCatalogStmt
* Manually refresh the catalog metadata.
*/
public class RefreshCatalogStmt extends DdlStmt {
+ private static final String INVALID_CACHE = "invalid_cache";
private final String catalogName;
+ private Map<String, String> properties;
+ private boolean invalidCache = false;
- public RefreshCatalogStmt(String catalogName) {
+ public RefreshCatalogStmt(String catalogName, Map<String, String> properties) {
this.catalogName = catalogName;
+ this.properties = properties;
}
public String getCatalogName() {
return catalogName;
}
+ public boolean isInvalidCache() {
+ return invalidCache;
+ }
+
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
@@ -56,6 +66,9 @@ public class RefreshCatalogStmt extends DdlStmt {
ErrorReport.reportAnalysisException(ErrorCode.ERR_CATALOG_ACCESS_DENIED,
analyzer.getQualifiedUser(), catalogName);
}
+ String invalidConfig = properties == null ? null : properties.get(INVALID_CACHE);
+ // Default is to invalid cache.
+ invalidCache = invalidConfig == null ? true : invalidConfig.equalsIgnoreCase("true");
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshDbStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshDbStmt.java
index 5118088916..ccd05a0524 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshDbStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RefreshDbStmt.java
@@ -31,19 +31,26 @@ import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.Map;
+
public class RefreshDbStmt extends DdlStmt {
private static final Logger LOG = LogManager.getLogger(RefreshDbStmt.class);
+ private static final String INVALID_CACHE = "invalid_cache";
private String catalogName;
private String dbName;
+ private Map<String, String> properties;
+ private boolean invalidCache = false;
- public RefreshDbStmt(String dbName) {
+ public RefreshDbStmt(String dbName, Map<String, String> properties) {
this.dbName = dbName;
+ this.properties = properties;
}
- public RefreshDbStmt(String catalogName, String dbName) {
+ public RefreshDbStmt(String catalogName, String dbName, Map<String, String> properties) {
this.catalogName = catalogName;
this.dbName = dbName;
+ this.properties = properties;
}
public String getDbName() {
@@ -54,6 +61,10 @@ public class RefreshDbStmt extends DdlStmt {
return catalogName;
}
+ public boolean isInvalidCache() {
+ return invalidCache;
+ }
+
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
@@ -82,6 +93,9 @@ public class RefreshDbStmt extends DdlStmt {
ErrorReport.reportAnalysisException(
ErrorCode.ERR_DBACCESS_DENIED_ERROR, analyzer.getQualifiedUser(), dbName);
}
+ String invalidConfig = properties == null ? null : properties.get(INVALID_CACHE);
+ // Default is to invalid cache.
+ invalidCache = invalidConfig == null ? true : invalidConfig.equalsIgnoreCase("true");
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 558b133f8c..b8d6929994 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -76,7 +76,7 @@ public class RefreshManager {
refreshInternalCtlIcebergDb(dbName, env);
} else {
// Process external catalog db refresh
- refreshExternalCtlDb(dbName, catalog);
+ refreshExternalCtlDb(dbName, catalog, stmt.isInvalidCache());
}
LOG.info("Successfully refresh db: {}", dbName);
}
@@ -107,7 +107,7 @@ public class RefreshManager {
env.getIcebergTableCreationRecordMgr().registerDb(db);
}
- private void refreshExternalCtlDb(String dbName, CatalogIf catalog) throws DdlException {
+ private void refreshExternalCtlDb(String dbName, CatalogIf catalog, boolean invalidCache) throws DdlException {
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support refresh ExternalCatalog Database");
}
@@ -116,10 +116,11 @@ public class RefreshManager {
if (db == null) {
throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
}
- ((ExternalDatabase) db).setUnInitialized();
+ ((ExternalDatabase) db).setUnInitialized(invalidCache);
ExternalObjectLog log = new ExternalObjectLog();
log.setCatalogId(catalog.getId());
log.setDbId(db.getId());
+ log.setInvalidCache(invalidCache);
Env.getCurrentEnv().getEditLog().logRefreshExternalDb(log);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index 820360c21d..6ae8594c07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -65,6 +65,7 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
@SerializedName(value = "initialized")
protected boolean initialized = false;
protected ExternalCatalog extCatalog;
+ protected boolean invalidCacheInInit = true;
/**
* No args constructor for persist.
@@ -93,9 +94,12 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
public void setTableExtCatalog(ExternalCatalog extCatalog) {
}
- public void setUnInitialized() {
+ public void setUnInitialized(boolean invalidCache) {
this.initialized = false;
- Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(extCatalog.getId(), name);
+ this.invalidCacheInInit = invalidCache;
+ if (invalidCache) {
+ Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(extCatalog.getId(), name);
+ }
}
public boolean isInitialized() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index f5704e909d..c5599ca411 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -54,6 +54,7 @@ public class CatalogFactory {
log.setNewCatalogName(((AlterCatalogNameStmt) stmt).getNewCatalogName());
} else if (stmt instanceof RefreshCatalogStmt) {
log.setCatalogId(catalogId);
+ log.setInvalidCache(((RefreshCatalogStmt) stmt).isInvalidCache());
} else {
throw new RuntimeException("Unknown stmt for catalog manager " + stmt.getClass().getSimpleName());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
index 0fc23af7b9..7e54e18386 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogLog.java
@@ -53,6 +53,9 @@ public class CatalogLog implements Writable {
@SerializedName(value = "newProps")
private Map<String, String> newProps;
+ @SerializedName(value = "invalidCache")
+ private boolean invalidCache;
+
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index 7071433049..244aff98d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -113,12 +113,12 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
return catalog;
}
- private void unprotectedRefreshCatalog(long catalogId) {
+ private void unprotectedRefreshCatalog(long catalogId, boolean invalidCache) {
CatalogIf catalog = idToCatalog.get(catalogId);
if (catalog != null) {
String catalogName = catalog.getName();
if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
- ((ExternalCatalog) catalog).setUninitialized();
+ ((ExternalCatalog) catalog).setUninitialized(invalidCache);
}
}
}
@@ -431,7 +431,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
public void replayRefreshCatalog(CatalogLog log) throws DdlException {
writeLock();
try {
- unprotectedRefreshCatalog(log.getCatalogId());
+ unprotectedRefreshCatalog(log.getCatalogId(), log.isInvalidCache());
} finally {
writeUnlock();
}
@@ -487,7 +487,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
try {
ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
- db.setUnInitialized();
+ db.setUnInitialized(log.isInvalidCache());
} finally {
writeUnlock();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
index 496eabfc8c..f078998fc8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
@@ -134,7 +134,7 @@ public class EsExternalCatalog extends ExternalCatalog {
initCatalogLog.setCatalogId(id);
initCatalogLog.setType(InitCatalogLog.Type.ES);
if (dbNameToId != null && dbNameToId.containsKey(DEFAULT_DB)) {
- idToDb.get(dbNameToId.get(DEFAULT_DB)).setUnInitialized();
+ idToDb.get(dbNameToId.get(DEFAULT_DB)).setUnInitialized(invalidCacheInInit);
initCatalogLog.addRefreshDb(dbNameToId.get(DEFAULT_DB));
} else {
dbNameToId = Maps.newConcurrentMap();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 2369e6afd4..cc0c2f0b2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -69,6 +69,7 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
// db name does not contains "default_cluster"
protected Map<String, Long> dbNameToId = Maps.newConcurrentMap();
private boolean objectCreated = false;
+ protected boolean invalidCacheInInit = true;
private ExternalSchemaCache schemaCache;
@@ -129,8 +130,15 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
// init schema related objects
protected abstract void init();
- public void setUninitialized() {
+ public void setUninitialized(boolean invalidCache) {
this.initialized = false;
+ this.invalidCacheInInit = invalidCache;
+ if (invalidCache) {
+ Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
+ }
+ }
+
+ public void updateDbList() {
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id);
}
@@ -209,7 +217,7 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
for (int i = 0; i < log.getRefreshCount(); i++) {
ExternalDatabase db = getDbForReplay(log.getRefreshDbIds().get(i));
- db.setUnInitialized();
+ db.setUnInitialized(invalidCacheInInit);
tmpDbNameToId.put(db.getFullName(), db.getId());
tmpIdToDb.put(db.getId(), db);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
index cff446657e..030f981382 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -43,6 +43,9 @@ public class ExternalObjectLog implements Writable {
@SerializedName(value = "tableId")
private long tableId;
+ @SerializedName(value = "invalidCache")
+ private boolean invalidCache;
+
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 15cb40fe5c..90e6e49da2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -72,7 +72,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
dbId = dbNameToId.get(dbName);
tmpDbNameToId.put(dbName, dbId);
ExternalDatabase db = idToDb.get(dbId);
- db.setUnInitialized();
+ db.setUnInitialized(invalidCacheInInit);
tmpIdToDb.put(dbId, db);
initCatalogLog.addRefreshDb(dbId);
} else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java
index bcae5988d9..713c8176d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/JdbcExternalCatalog.java
@@ -122,7 +122,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
dbId = dbNameToId.get(dbName);
tmpDbNameToId.put(dbName, dbId);
ExternalDatabase db = idToDb.get(dbId);
- db.setUnInitialized();
+ db.setUnInitialized(invalidCacheInInit);
tmpIdToDb.put(dbId, db);
initCatalogLog.addRefreshDb(dbId);
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org