You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/03 05:59:19 UTC
[doris] branch master updated: [feature-wip](multi-catalog) support automatic sync hive metastore events (#15401)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 893f5f9345 [feature-wip](multi-catalog) support automatic sync hive metastore events (#15401)
893f5f9345 is described below
commit 893f5f93455883d3fab681e95afc47363eeeee25
Author: zhangdong <49...@qq.com>
AuthorDate: Tue Jan 3 13:59:14 2023 +0800
[feature-wip](multi-catalog) support automatic sync hive metastore events (#15401)
Poll metastore for create/alter/drop operations on database, table, partition events at a given frequency.
By observing such events, we can take appropriate action on the (refresh/invalidate/add/remove)
so that represents the latest information available in metastore.
We keep track of the last synced event id in each polling
iteration so the next batch can be requested appropriately.
---
.../java/org/apache/doris/catalog/DatabaseIf.java | 2 +
.../main/java/org/apache/doris/catalog/Env.java | 7 +
.../doris/catalog/external/ExternalDatabase.java | 5 +
.../catalog/external/HMSExternalDatabase.java | 10 +
.../main/java/org/apache/doris/common/Config.java | 18 ++
.../org/apache/doris/datasource/CatalogMgr.java | 53 +++++-
.../apache/doris/datasource/ExternalCatalog.java | 3 +
.../doris/datasource/HMSExternalCatalog.java | 54 ++++++
.../datasource/PooledHiveMetaStoreClient.java | 27 +++
.../datasource/hive/event/DropTableEvent.java | 89 +++++++++
.../doris/datasource/hive/event/EventFactory.java | 32 ++++
.../doris/datasource/hive/event/IgnoredEvent.java | 43 +++++
.../datasource/hive/event/MetastoreEvent.java | 203 +++++++++++++++++++++
.../hive/event/MetastoreEventFactory.java | 81 ++++++++
.../datasource/hive/event/MetastoreEventType.java | 68 +++++++
.../hive/event/MetastoreEventsProcessor.java | 151 +++++++++++++++
.../hive/event/MetastoreNotificationException.java | 37 ++++
.../event/MetastoreNotificationFetchException.java | 37 ++++
.../datasource/hive/event/MetastoreTableEvent.java | 50 +++++
.../org/apache/doris/journal/JournalEntity.java | 1 +
.../java/org/apache/doris/persist/EditLog.java | 9 +
.../org/apache/doris/persist/OperationType.java | 2 +
22 files changed, 976 insertions(+), 6 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
index ffd271d84e..f0dfa53484 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
@@ -214,4 +214,6 @@ public interface DatabaseIf<T extends TableIf> {
}
return (OlapTable) table;
}
+
+ void dropTable(String tableName);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 1d95551df8..83bde25845 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -126,6 +126,7 @@ import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
import org.apache.doris.deploy.DeployManager;
import org.apache.doris.deploy.impl.AmbariDeployManager;
import org.apache.doris.deploy.impl.K8sDeployManager;
@@ -316,6 +317,7 @@ public class Env {
private DeleteHandler deleteHandler;
private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
+ private MetastoreEventsProcessor metastoreEventsProcessor;
private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
private MasterDaemon txnCleaner; // To clean aborted or timeout txns
@@ -549,6 +551,7 @@ public class Env {
this.deleteHandler = new DeleteHandler();
this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
this.partitionInMemoryInfoCollector = new PartitionInMemoryInfoCollector();
+ this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.replayedJournalId = new AtomicLong(0L);
this.isElectable = false;
@@ -1402,6 +1405,10 @@ public class Env {
if (Config.enable_fqdn_mode) {
fqdnManager.start();
}
+ if (Config.enable_hms_events_incremental_sync) {
+ metastoreEventsProcessor.start();
+ }
+
}
// start threads that should running on all FE
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index 6ae8594c07..65c027713e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -258,4 +258,9 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
@Override
public void gsonPostProcess() throws IOException {}
+
+ @Override
+ public void dropTable(String tableName) {
+ throw new NotImplementedException();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index decef86caa..a1f6bcddab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -170,4 +170,14 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl
idToTbl.put(tbl.getId(), tbl);
tableNameToId.put(tbl.getName(), tbl.getId());
}
+
+ @Override
+ public void dropTable(String tableName) {
+ LOG.debug("drop table [{}]", tableName);
+ Long tableId = tableNameToId.remove(tableName);
+ if (tableId == null) {
+ LOG.warn("drop table [{}] failed", tableName);
+ }
+ idToTbl.remove(tableId);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 2bc30113e6..93fbfc6e18 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1935,5 +1935,23 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true)
public static boolean enable_func_pushdown = true;
+
+ /**
+ * If set to true, doris will automatically synchronize hms metadata to the cache in fe.
+ */
+ @ConfField(masterOnly = true)
+ public static boolean enable_hms_events_incremental_sync = false;
+
+ /**
+ * Maximum number of events to poll in each RPC.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int hms_events_batch_size_per_rpc = 500;
+
+ /**
+ * HMS polling interval in milliseconds.
+ */
+ @ConfField(masterOnly = true)
+ public static int hms_events_polling_interval_ms = 20000;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index a62562acd7..3f9a76bb7b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Resource.ReferenceType;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.cluster.ClusterNamespace;
@@ -439,13 +440,17 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
* Refresh the catalog meta and write the meta log.
*/
public void refreshCatalog(RefreshCatalogStmt stmt) throws UserException {
+ CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " + stmt.getCatalogName());
+ }
+ CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
+ refreshCatalog(log);
+ }
+
+ public void refreshCatalog(CatalogLog log) {
writeLock();
try {
- CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
- if (catalog == null) {
- throw new DdlException("No catalog found with name: " + stmt.getCatalogName());
- }
- CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
replayRefreshCatalog(log);
Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_REFRESH_CATALOG, log);
} finally {
@@ -481,7 +486,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
/**
* Reply for refresh catalog event.
*/
- public void replayRefreshCatalog(CatalogLog log) throws DdlException {
+ public void replayRefreshCatalog(CatalogLog log) {
writeLock();
try {
unprotectedRefreshCatalog(log.getCatalogId(), log.isInvalidCache());
@@ -554,6 +559,42 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
.invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
}
+ public void dropExternalTable(String dbName, String tableName, String catalogName) throws DdlException {
+ CatalogIf catalog = nameToCatalog.get(catalogName);
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " + catalogName);
+ }
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new DdlException("Only support drop ExternalCatalog Tables");
+ }
+ DatabaseIf db = catalog.getDbNullable(dbName);
+ if (db == null) {
+ throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
+ }
+
+ TableIf table = db.getTableNullable(tableName);
+ if (table == null) {
+ throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
+ }
+ ExternalObjectLog log = new ExternalObjectLog();
+ log.setCatalogId(catalog.getId());
+ log.setDbId(db.getId());
+ log.setTableId(table.getId());
+ replayDropExternalTable(log);
+ Env.getCurrentEnv().getEditLog().logDropExternalTable(log);
+ }
+
+ public void replayDropExternalTable(ExternalObjectLog log) {
+ LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), log.getDbId(),
+ log.getTableId());
+ ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
+ ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+ ExternalTable table = db.getTableForReplay(log.getTableId());
+ db.dropTable(table.getName());
+ Env.getCurrentEnv().getExtMetaCacheMgr()
+ .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
+ }
+
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 2d68d28fee..e730fb6bc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -295,6 +295,9 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
db.setTableExtCatalog(this);
}
objectCreated = false;
+ if (this instanceof HMSExternalCatalog) {
+ ((HMSExternalCatalog) this).setLastSyncedEventId(-1L);
+ }
}
public void addDatabaseForTest(ExternalDatabase db) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index e2a5f2f3a8..ec94867fc1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -25,14 +25,20 @@ import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalDatabase;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
@@ -42,8 +48,12 @@ import java.util.Map;
* External catalog for hive metastore compatible data sources.
*/
public class HMSExternalCatalog extends ExternalCatalog {
+ private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class);
+
private static final int MAX_CLIENT_POOL_SIZE = 8;
protected PooledHiveMetaStoreClient client;
+ // Record the latest synced event id when processing hive events
+ private long lastSyncedEventId;
/**
* Default constructor for HMSExternalCatalog.
@@ -170,4 +180,48 @@ public class HMSExternalCatalog extends ExternalCatalog {
}
return tmpSchema;
}
+
+ public void setLastSyncedEventId(long lastSyncedEventId) {
+ this.lastSyncedEventId = lastSyncedEventId;
+ }
+
+ public NotificationEventResponse getNextEventResponse(HMSExternalCatalog hmsExternalCatalog)
+ throws MetastoreNotificationFetchException {
+ makeSureInitialized();
+ if (lastSyncedEventId < 0) {
+ lastSyncedEventId = getCurrentEventId();
+ refreshCatalog(hmsExternalCatalog);
+ LOG.info(
+ "First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId,"
+ + "lastSyncedEventId is [{}]",
+ hmsExternalCatalog.getName(), lastSyncedEventId);
+ return null;
+ }
+
+ long currentEventId = getCurrentEventId();
+ LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {},lastSyncedEventId is {}",
+ hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId);
+ if (currentEventId == lastSyncedEventId) {
+ LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName());
+ return null;
+ }
+ return client.getNextNotification(lastSyncedEventId, Config.hms_events_batch_size_per_rpc, null);
+ }
+
+ private void refreshCatalog(HMSExternalCatalog hmsExternalCatalog) {
+ CatalogLog log = new CatalogLog();
+ log.setCatalogId(hmsExternalCatalog.getId());
+ log.setInvalidCache(true);
+ Env.getCurrentEnv().getCatalogMgr().refreshCatalog(log);
+ }
+
+ private long getCurrentEventId() {
+ makeSureInitialized();
+ CurrentNotificationEventId currentNotificationEventId = client.getCurrentNotificationEventId();
+ if (currentNotificationEventId == null) {
+ LOG.warn("Get currentNotificationEventId is null");
+ return -1;
+ }
+ return currentNotificationEventId.getEventId();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
index 05e3d9d15c..008253c450 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
@@ -19,6 +19,7 @@ package org.apache.doris.datasource;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.common.Config;
+import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import com.google.common.base.Preconditions;
@@ -29,8 +30,10 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
@@ -145,6 +148,30 @@ public class PooledHiveMetaStoreClient {
}
}
+ public CurrentNotificationEventId getCurrentNotificationEventId() {
+ try (CachedClient client = getClient()) {
+ return client.client.getCurrentNotificationEventId();
+ } catch (Exception e) {
+ LOG.warn("Failed to fetch current notification event id", e);
+ throw new MetastoreNotificationFetchException(
+ "Failed to get current notification event id. msg: " + e.getMessage());
+ }
+ }
+
+ public NotificationEventResponse getNextNotification(long lastEventId,
+ int maxEvents,
+ IMetaStoreClient.NotificationFilter filter)
+ throws MetastoreNotificationFetchException {
+ try (CachedClient client = getClient()) {
+ return client.client.getNextNotification(lastEventId, maxEvents, filter);
+ } catch (Exception e) {
+ LOG.warn("Failed to get next notification based on last event id {}", lastEventId, e);
+ throw new MetastoreNotificationFetchException(
+ "Failed to get next notification based on last event id: " + lastEventId + ". msg: " + e
+ .getMessage());
+ }
+ }
+
private class CachedClient implements AutoCloseable {
private final IMetaStoreClient client;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
new file mode 100644
index 0000000000..8647e47b78
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * MetastoreEvent for DROP_TABLE event type
+ */
+public class DropTableEvent extends MetastoreTableEvent {
+ private static final Logger LOG = LogManager.getLogger(DropTableEvent.class);
+ private final String dbName;
+ private final String tableName;
+
+ private DropTableEvent(NotificationEvent event,
+ String catalogName) {
+ super(event, catalogName);
+ Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(getEventType()));
+ JSONDropTableMessage dropTableMessage =
+ (JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
+ .getDropTableMessage(event.getMessage());
+ try {
+ dbName = dropTableMessage.getDB();
+ tableName = dropTableMessage.getTable();
+ } catch (Exception e) {
+ throw new MetastoreNotificationException(debugString(
+ "Could not parse event message. "
+ + "Check if %s is set to true in metastore configuration",
+ MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
+ }
+ }
+
+ public static List<MetastoreEvent> getEvents(NotificationEvent event,
+ String catalogName) {
+ return Lists.newArrayList(new DropTableEvent(event, catalogName));
+ }
+
+ @Override
+ protected boolean existInCache() {
+ return true;
+ }
+
+ @Override
+ protected boolean canBeSkipped() {
+ return false;
+ }
+
+ protected boolean isSupported() {
+ return true;
+ }
+
+ @Override
+ protected void process() throws MetastoreNotificationException {
+ try {
+ LOG.info("DropTable event process,catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName,
+ tableName);
+ Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName, tableName, catalogName);
+ } catch (DdlException e) {
+ LOG.warn("DropExternalTable failed,dbName:[{}],tableName:[{}],catalogName:[{}].", dbName, tableName,
+ catalogName, e);
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java
new file mode 100644
index 0000000000..333687e2ab
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import java.util.List;
+
+/**
+ * Factory interface to generate a {@link MetastoreEvent} from a {@link NotificationEvent} object.
+ */
+public interface EventFactory {
+
+ List<MetastoreEvent> transferNotificationEventToMetastoreEvents(NotificationEvent hmsEvent,
+ String catalogName) throws MetastoreNotificationException;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
new file mode 100644
index 0000000000..4d2dc1a178
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import java.util.List;
+
+/**
+ * An event type which is ignored. Useful for unsupported metastore event types
+ */
+public class IgnoredEvent extends MetastoreEvent {
+ protected IgnoredEvent(NotificationEvent event, String catalogName) {
+ super(event, catalogName);
+ }
+
+ private static List<MetastoreEvent> getEvents(NotificationEvent event,
+ String catalogName) {
+ return Lists.newArrayList(new IgnoredEvent(event, catalogName));
+ }
+
+ @Override
+ public void process() {
+ debugLog("Ignoring unknown event type " + metastoreNotificationEvent.getEventType());
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
new file mode 100644
index 0000000000..5cc4594457
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Abstract base class for all MetastoreEvents. A MetastoreEvent is an object used to
+ * process a NotificationEvent received from metastore.
+ */
+public abstract class MetastoreEvent {
+ private static final Logger LOG = LogManager.getLogger(MetastoreEvent.class);
+ // String.format compatible string to prepend event id and type
+ private static final String STR_FORMAT_EVENT_ID_TYPE = "EventId: %d EventType: %s ";
+
+ // logger format compatible string to prepend to a log formatted message
+ private static final String LOG_FORMAT_EVENT_ID_TYPE = "EventId: {} EventType: {} ";
+
+ // the notification received from metastore which is processed by this
+ protected final NotificationEvent event;
+
+ // dbName from the event
+ protected final String dbName;
+
+ // tblName from the event
+ protected final String tblName;
+
+ // eventId of the event. Used instead of calling getter on event everytime
+ private final long eventId;
+
+ // eventType from the NotificationEvent
+ private final MetastoreEventType eventType;
+
+ // Actual notificationEvent object received from Metastore
+ protected final NotificationEvent metastoreNotificationEvent;
+
+ protected final String catalogName;
+
+ protected MetastoreEvent(NotificationEvent event, String catalogName) {
+ this.event = event;
+ this.dbName = event.getDbName();
+ this.tblName = event.getTableName();
+ this.eventId = event.getEventId();
+ this.eventType = MetastoreEventType.from(event.getEventType());
+ this.metastoreNotificationEvent = event;
+ this.catalogName = catalogName;
+ }
+
+ public long getEventId() {
+ return eventId;
+ }
+
+ public MetastoreEventType getEventType() {
+ return eventType;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getTblName() {
+ return tblName;
+ }
+
+ /**
+ * Checks if the given event can be batched into this event. Default behavior is
+ * to return false which can be overridden by a sub-class.
+ * The current version is relatively simple to process batch events, so all that need to be processed are true.
+ *
+ * @param event The event under consideration to be batched into this event.
+ * @return false if event cannot be batched into this event; otherwise true.
+ */
+ protected boolean canBeBatched(MetastoreEvent event) {
+ return false;
+ }
+
+ /**
+ * Adds the given event into the batch of events represented by this event. Default
+ * implementation is to return null. Sub-classes must override this method to
+ * implement batching.
+ *
+ * @param event The event which needs to be added to the batch.
+ * @return The batch event which represents all the events batched into this event
+ * until now including the given event.
+ */
+ protected MetastoreEvent addToBatchEvents(MetastoreEvent event) {
+ return null;
+ }
+
+
+ protected boolean existInCache() throws MetastoreNotificationException {
+ return false;
+ }
+
+ /**
+ * Returns the number of events represented by this event. For most events this is 1.
+ * In case of batch events this could be more than 1.
+ */
+ protected int getNumberOfEvents() {
+ return 1;
+ }
+
+ /**
+ * Certain events like ALTER_TABLE or ALTER_PARTITION implement logic to ignore
+ * some events because they do not affect query results.
+ *
+ * @return true if this event can be skipped.
+ */
+ protected boolean canBeSkipped() {
+ return false;
+ }
+
+ /**
+ * Whether the current version of FE supports processing of some events, some events are reserved,
+ * and may be processed later version.
+ */
+ protected boolean isSupported() {
+ return false;
+ }
+
+ /**
+ * Process the information available in the NotificationEvent.
+ */
+ protected abstract void process() throws MetastoreNotificationException;
+
+ /**
+ * Helper method to get debug string with helpful event information prepended to the
+ * message. This can be used to generate helpful exception messages
+ *
+ * @param msgFormatString String value to be used in String.format() for the given message
+ * @param args args to the <code>String.format()</code> for the given msgFormatString
+ */
+ protected String debugString(String msgFormatString, Object... args) {
+ String formatString = STR_FORMAT_EVENT_ID_TYPE + msgFormatString;
+ Object[] formatArgs = getLogFormatArgs(args);
+ return String.format(formatString, formatArgs);
+ }
+
+ /**
+ * Helper method to generate the format args after prepending the event id and type
+ */
+ private Object[] getLogFormatArgs(Object[] args) {
+ Object[] formatArgs = new Object[args.length + 2];
+ formatArgs[0] = getEventId();
+ formatArgs[1] = getEventType();
+ int i = 2;
+ for (Object arg : args) {
+ formatArgs[i] = arg;
+ i++;
+ }
+ return formatArgs;
+ }
+
+ /**
+ * Logs at info level the given log formatted string and its args. The log formatted
+ * string should have {} pair at the appropriate location in the string for each arg
+ * value provided. This method prepends the event id and event type before logging the
+ * message. No-op if the log level is not at INFO
+ */
+ protected void infoLog(String logFormattedStr, Object... args) {
+ if (!LOG.isInfoEnabled()) {
+ return;
+ }
+ String formatString = LOG_FORMAT_EVENT_ID_TYPE + logFormattedStr;
+ Object[] formatArgs = getLogFormatArgs(args);
+ LOG.info(formatString, formatArgs);
+ }
+
+ /**
+ * Similar to infoLog excepts logs at debug level
+ */
+ protected void debugLog(String logFormattedStr, Object... args) {
+ if (!LOG.isDebugEnabled()) {
+ return;
+ }
+ String formatString = LOG_FORMAT_EVENT_ID_TYPE + logFormattedStr;
+ Object[] formatArgs = getLogFormatArgs(args);
+ LOG.debug(formatString, formatArgs);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
new file mode 100644
index 0000000000..2719158c8e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.datasource.HMSExternalCatalog;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Factory class to create various MetastoreEvents.
+ */
+public class MetastoreEventFactory implements EventFactory {
+ private static final Logger LOG = LogManager.getLogger(MetastoreEventFactory.class);
+
+ @Override
+ public List<MetastoreEvent> transferNotificationEventToMetastoreEvents(NotificationEvent event,
+ String catalogName) {
+ Preconditions.checkNotNull(event.getEventType());
+ MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType());
+ switch (metastoreEventType) {
+ case DROP_TABLE:
+ return DropTableEvent.getEvents(event, catalogName);
+ default:
+ // ignore all the unknown events by creating a IgnoredEvent
+ return Lists.newArrayList(new IgnoredEvent(event, catalogName));
+ }
+ }
+
+ List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) {
+ List<MetastoreEvent> metastoreEvents = Lists.newArrayList();
+
+ for (NotificationEvent event : events) {
+ metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, hmsExternalCatalog.getName()));
+ }
+
+ List<MetastoreEvent> tobeProcessEvents = metastoreEvents.stream()
+ .filter(MetastoreEvent::isSupported)
+ .collect(Collectors.toList());
+
+ if (tobeProcessEvents.isEmpty()) {
+ LOG.info("The metastore events to process is empty on catalog {}", hmsExternalCatalog.getName());
+ return Collections.emptyList();
+ }
+
+ return createBatchEvents(tobeProcessEvents);
+ }
+
+ /**
+ * Create batch event tasks according to HivePartitionName to facilitate subsequent parallel processing.
+ * For ADD_PARTITION and DROP_PARTITION, we directly override any events before that partition.
+ * For a partition, it is meaningless to process any events before the drop partition.
+ */
+ List<MetastoreEvent> createBatchEvents(List<MetastoreEvent> events) {
+ // now do nothing
+ return events;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java
new file mode 100644
index 0000000000..31dce29366
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+/**
+ * Currently we only support handling some events.
+ */
+public enum MetastoreEventType {
+ CREATE_TABLE("CREATE_TABLE"),
+ DROP_TABLE("DROP_TABLE"),
+ ALTER_TABLE("ALTER_TABLE"),
+ CREATE_DATABASE("CREATE_DATABASE"),
+ DROP_DATABASE("DROP_DATABASE"),
+ ALTER_DATABASE("ALTER_DATABASE"),
+ ADD_PARTITION("ADD_PARTITION"),
+ ALTER_PARTITION("ALTER_PARTITION"),
+ ALTER_PARTITIONS("ALTER_PARTITIONS"),
+ DROP_PARTITION("DROP_PARTITION"),
+ INSERT("INSERT"),
+ INSERT_PARTITIONS("INSERT_PARTITIONS"),
+ ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"),
+ COMMIT_TXN("COMMIT_TXN"),
+ ABORT_TXN("ABORT_TXN"),
+ OTHER("OTHER");
+
+ private final String eventType;
+
+ MetastoreEventType(String msEventType) {
+ this.eventType = msEventType;
+ }
+
+ @Override
+ public String toString() {
+ return eventType;
+ }
+
+ /**
+ * Returns the MetastoreEventType from a given string value of event from Metastore's
+ * NotificationEvent.eventType. If none of the supported MetastoreEventTypes match,
+ * return OTHER
+ *
+ * @param eventType EventType value from the {@link org.apache.hadoop.hive.metastore.api.NotificationEvent}
+ */
+ public static MetastoreEventType from(String eventType) {
+ for (MetastoreEventType metastoreEventType : values()) {
+ if (metastoreEventType.eventType.equalsIgnoreCase(eventType)) {
+ return metastoreEventType;
+ }
+ }
+ return OTHER;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
new file mode 100644
index 0000000000..1ff3bd98b2
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.HMSExternalCatalog;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A metastore event is a instance of the class
+ * {@link NotificationEvent}. Metastore can be
+ * configured, to work with Listeners which are called on various DDL operations like
+ * create/alter/drop operations on database, table, partition etc. Each event has a unique
+ * incremental id and the generated events are be fetched from Metastore to get
+ * incremental updates to the metadata stored in Hive metastore using the the public API
+ * <code>get_next_notification</code> These events could be generated by external
+ * Metastore clients like Apache Hive or Apache Spark configured to talk with the same metastore.
+ * <p>
+ * This class is used to poll metastore for such events at a given frequency. By observing
+ * such events, we can take appropriate action on the {@link org.apache.doris.datasource.hive.HiveMetaStoreCache}
+ * (refresh/invalidate/add/remove) so that represents the latest information
+ * available in metastore. We keep track of the last synced event id in each polling
+ * iteration so the next batch can be requested appropriately. The current batch size is
+ * constant and set to {@link org.apache.doris.common.Config#hms_events_batch_size_per_rpc}.
+ */
+public class MetastoreEventsProcessor extends MasterDaemon {
+ private static final Logger LOG = LogManager.getLogger(MetastoreEventsProcessor.class);
+ public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY =
+ "hive.metastore.notifications.add.thrift.objects";
+
+ // for deserializing from JSON strings from metastore event
+ private static final MessageDeserializer MESSAGE_DESERIALIZER = new JSONMessageDeserializer();
+
+
+ // event factory which is used to get or create MetastoreEvents
+ private final MetastoreEventFactory metastoreEventFactory;
+
+ private boolean isRunning;
+
+ public MetastoreEventsProcessor() {
+ super(MetastoreEventsProcessor.class.getName(), Config.hms_events_polling_interval_ms);
+ this.metastoreEventFactory = new MetastoreEventFactory();
+ this.isRunning = false;
+ }
+
+ /**
+ * Fetch the next batch of NotificationEvents from metastore. The default batch size is
+ * <code>{@link Config#hms_events_batch_size_per_rpc}</code>
+ */
+ private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog hmsExternalCatalog) {
+ LOG.debug("Start to pull events on catalog [{}]", hmsExternalCatalog.getName());
+ NotificationEventResponse response = hmsExternalCatalog.getNextEventResponse(hmsExternalCatalog);
+
+ if (response == null) {
+ return Collections.emptyList();
+ }
+ return response.getEvents();
+ }
+
+ private void doExecute(List<MetastoreEvent> events, HMSExternalCatalog hmsExternalCatalog) {
+ for (MetastoreEvent event : events) {
+ try {
+ event.process();
+ } catch (Exception e) {
+ hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1);
+ throw e;
+ }
+ }
+ }
+
+ /**
+ * Process the given list of notification events. Useful for tests which provide a list of events
+ */
+ private void processEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) {
+ //transfer
+ List<MetastoreEvent> metastoreEvents = metastoreEventFactory.getMetastoreEvents(events, hmsExternalCatalog);
+ doExecute(metastoreEvents, hmsExternalCatalog);
+ hmsExternalCatalog.setLastSyncedEventId(events.get(events.size() - 1).getEventId());
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ if (isRunning) {
+ LOG.warn("Last task not finished,ignore current task.");
+ return;
+ }
+ isRunning = true;
+ try {
+ realRun();
+ } catch (Exception ex) {
+ LOG.warn("Task failed", ex);
+ }
+ isRunning = false;
+ }
+
+ private void realRun() {
+ List<Long> catalogIds = Env.getCurrentEnv().getCatalogMgr().getCatalogIds();
+ for (Long catalogId : catalogIds) {
+ CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
+ if (catalog instanceof HMSExternalCatalog) {
+ HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog;
+ List<NotificationEvent> events = Collections.emptyList();
+ try {
+ events = getNextHMSEvents(hmsExternalCatalog);
+ if (!events.isEmpty()) {
+ LOG.info("Events size are {} on catalog [{}]", events.size(),
+ hmsExternalCatalog.getName());
+ processEvents(events, hmsExternalCatalog);
+ }
+ } catch (MetastoreNotificationFetchException e) {
+ LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e);
+ } catch (Exception ex) {
+ LOG.warn("Failed to process hive metastore [{}] events .",
+ hmsExternalCatalog.getName(), ex);
+ }
+ }
+ }
+ }
+
+ public static MessageDeserializer getMessageDeserializer() {
+ return MESSAGE_DESERIALIZER;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java
new file mode 100644
index 0000000000..2bd5c4c40c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+/**
+ * Utility exception class to be thrown for errors during event processing
+ */
+public class MetastoreNotificationException extends RuntimeException {
+
+ public MetastoreNotificationException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ public MetastoreNotificationException(String msg) {
+ super(msg);
+ }
+
+ public MetastoreNotificationException(Exception e) {
+ super(e);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java
new file mode 100644
index 0000000000..487165eeca
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+/**
+ * Utility exception class to be thrown for errors during event processing
+ */
+public class MetastoreNotificationFetchException extends MetastoreNotificationException {
+
+ public MetastoreNotificationFetchException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ public MetastoreNotificationFetchException(String msg) {
+ super(msg);
+ }
+
+ public MetastoreNotificationFetchException(Exception e) {
+ super(e);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
new file mode 100644
index 0000000000..70f56bdbb0
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import java.util.List;
+
+/**
+ * Base class for all the table events
+ */
+public abstract class MetastoreTableEvent extends MetastoreEvent {
+
+
+ protected MetastoreTableEvent(NotificationEvent event, String catalogName) {
+ super(event, catalogName);
+ Preconditions.checkNotNull(dbName, "Database name cannot be null");
+ Preconditions.checkNotNull(tblName, "Table name cannot be null");
+ }
+
+ /**
+ * Returns a list of parameters that are set by Hive for tables/partitions that can be
+ * ignored to determine if the alter table/partition event is a trivial one.
+ */
+ private static final List<String> PARAMETERS_TO_IGNORE =
+ new ImmutableList.Builder<String>()
+ .add("transient_lastDdlTime")
+ .add("numFilesErasureCoded")
+ .add("numFiles")
+ .add("comment")
+ .build();
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 96ae663015..fb8fbb5b99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -706,6 +706,7 @@ public class JournalEntity implements Writable {
break;
}
case OperationType.OP_REFRESH_EXTERNAL_DB:
+ case OperationType.OP_DROP_EXTERNAL_TABLE:
case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
data = ExternalObjectLog.read(in);
isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 217c0c117a..9254583294 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -949,6 +949,11 @@ public class EditLog {
env.getCatalogMgr().replayRefreshExternalTable(log);
break;
}
+ case OperationType.OP_DROP_EXTERNAL_TABLE: {
+ final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
+ env.getCatalogMgr().replayDropExternalTable(log);
+ break;
+ }
case OperationType.OP_INIT_EXTERNAL_TABLE: {
// Do nothing.
break;
@@ -1624,6 +1629,10 @@ public class EditLog {
logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log);
}
+ public void logDropExternalTable(ExternalObjectLog log) {
+ logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log);
+ }
+
public Journal getJournal() {
return this.journal;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 6204fc1836..c5889a8001 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -255,6 +255,8 @@ public class OperationType {
public static final short OP_DROP_MTMV_TASK = 341;
public static final short OP_ALTER_MTMV_TASK = 342;
+ public static final short OP_DROP_EXTERNAL_TABLE = 350;
+
public static final short OP_ALTER_USER = 400;
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org