You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/03 05:59:19 UTC

[doris] branch master updated: [feature-wip](multi-catalog) support automatic sync hive metastore events (#15401)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 893f5f9345 [feature-wip](multi-catalog) support automatic sync hive metastore events (#15401)
893f5f9345 is described below

commit 893f5f93455883d3fab681e95afc47363eeeee25
Author: zhangdong <49...@qq.com>
AuthorDate: Tue Jan 3 13:59:14 2023 +0800

    [feature-wip](multi-catalog) support automatic sync hive metastore events (#15401)
    
    Poll metastore for create/alter/drop operations on database, table, partition events at a given frequency.
    By observing such events, we can take appropriate action on the (refresh/invalidate/add/remove)
    so that represents the latest information available in metastore.
    We keep track of the last synced event id in each polling
    iteration so the next batch can be requested appropriately.
---
 .../java/org/apache/doris/catalog/DatabaseIf.java  |   2 +
 .../main/java/org/apache/doris/catalog/Env.java    |   7 +
 .../doris/catalog/external/ExternalDatabase.java   |   5 +
 .../catalog/external/HMSExternalDatabase.java      |  10 +
 .../main/java/org/apache/doris/common/Config.java  |  18 ++
 .../org/apache/doris/datasource/CatalogMgr.java    |  53 +++++-
 .../apache/doris/datasource/ExternalCatalog.java   |   3 +
 .../doris/datasource/HMSExternalCatalog.java       |  54 ++++++
 .../datasource/PooledHiveMetaStoreClient.java      |  27 +++
 .../datasource/hive/event/DropTableEvent.java      |  89 +++++++++
 .../doris/datasource/hive/event/EventFactory.java  |  32 ++++
 .../doris/datasource/hive/event/IgnoredEvent.java  |  43 +++++
 .../datasource/hive/event/MetastoreEvent.java      | 203 +++++++++++++++++++++
 .../hive/event/MetastoreEventFactory.java          |  81 ++++++++
 .../datasource/hive/event/MetastoreEventType.java  |  68 +++++++
 .../hive/event/MetastoreEventsProcessor.java       | 151 +++++++++++++++
 .../hive/event/MetastoreNotificationException.java |  37 ++++
 .../event/MetastoreNotificationFetchException.java |  37 ++++
 .../datasource/hive/event/MetastoreTableEvent.java |  50 +++++
 .../org/apache/doris/journal/JournalEntity.java    |   1 +
 .../java/org/apache/doris/persist/EditLog.java     |   9 +
 .../org/apache/doris/persist/OperationType.java    |   2 +
 22 files changed, 976 insertions(+), 6 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
index ffd271d84e..f0dfa53484 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java
@@ -214,4 +214,6 @@ public interface DatabaseIf<T extends TableIf> {
         }
         return (OlapTable) table;
     }
+
+    void dropTable(String tableName);
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 1d95551df8..83bde25845 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -126,6 +126,7 @@ import org.apache.doris.datasource.CatalogMgr;
 import org.apache.doris.datasource.EsExternalCatalog;
 import org.apache.doris.datasource.ExternalMetaCacheMgr;
 import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
 import org.apache.doris.deploy.DeployManager;
 import org.apache.doris.deploy.impl.AmbariDeployManager;
 import org.apache.doris.deploy.impl.K8sDeployManager;
@@ -316,6 +317,7 @@ public class Env {
     private DeleteHandler deleteHandler;
     private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
     private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
+    private MetastoreEventsProcessor metastoreEventsProcessor;
 
     private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
     private MasterDaemon txnCleaner; // To clean aborted or timeout txns
@@ -549,6 +551,7 @@ public class Env {
         this.deleteHandler = new DeleteHandler();
         this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
         this.partitionInMemoryInfoCollector = new PartitionInMemoryInfoCollector();
+        this.metastoreEventsProcessor = new MetastoreEventsProcessor();
 
         this.replayedJournalId = new AtomicLong(0L);
         this.isElectable = false;
@@ -1402,6 +1405,10 @@ public class Env {
         if (Config.enable_fqdn_mode) {
             fqdnManager.start();
         }
+        if (Config.enable_hms_events_incremental_sync) {
+            metastoreEventsProcessor.start();
+        }
+
     }
 
     // start threads that should running on all FE
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index 6ae8594c07..65c027713e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -258,4 +258,9 @@ public class ExternalDatabase<T extends ExternalTable> implements DatabaseIf<T>,
 
     @Override
     public void gsonPostProcess() throws IOException {}
+
+    @Override
+    public void dropTable(String tableName) {
+        throw new NotImplementedException();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index decef86caa..a1f6bcddab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -170,4 +170,14 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> impl
         idToTbl.put(tbl.getId(), tbl);
         tableNameToId.put(tbl.getName(), tbl.getId());
     }
+
+    @Override
+    public void dropTable(String tableName) {
+        LOG.debug("drop table [{}]", tableName);
+        Long tableId = tableNameToId.remove(tableName);
+        if (tableId == null) {
+            LOG.warn("drop table [{}] failed", tableName);
+        }
+        idToTbl.remove(tableId);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 2bc30113e6..93fbfc6e18 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1935,5 +1935,23 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true)
     public static boolean enable_func_pushdown = true;
+
+    /**
+     * If set to true, doris will automatically synchronize hms metadata to the cache in fe.
+     */
+    @ConfField(masterOnly = true)
+    public static boolean enable_hms_events_incremental_sync = false;
+
+    /**
+     * Maximum number of events to poll in each RPC.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int hms_events_batch_size_per_rpc = 500;
+
+    /**
+     * HMS polling interval in milliseconds.
+     */
+    @ConfField(masterOnly = true)
+    public static int hms_events_polling_interval_ms = 20000;
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index a62562acd7..3f9a76bb7b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.Resource.ReferenceType;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.cluster.ClusterNamespace;
@@ -439,13 +440,17 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
      * Refresh the catalog meta and write the meta log.
      */
     public void refreshCatalog(RefreshCatalogStmt stmt) throws UserException {
+        CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
+        if (catalog == null) {
+            throw new DdlException("No catalog found with name: " + stmt.getCatalogName());
+        }
+        CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
+        refreshCatalog(log);
+    }
+
+    public void refreshCatalog(CatalogLog log) {
         writeLock();
         try {
-            CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
-            if (catalog == null) {
-                throw new DdlException("No catalog found with name: " + stmt.getCatalogName());
-            }
-            CatalogLog log = CatalogFactory.constructorCatalogLog(catalog.getId(), stmt);
             replayRefreshCatalog(log);
             Env.getCurrentEnv().getEditLog().logCatalogLog(OperationType.OP_REFRESH_CATALOG, log);
         } finally {
@@ -481,7 +486,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
     /**
      * Reply for refresh catalog event.
      */
-    public void replayRefreshCatalog(CatalogLog log) throws DdlException {
+    public void replayRefreshCatalog(CatalogLog log) {
         writeLock();
         try {
             unprotectedRefreshCatalog(log.getCatalogId(), log.isInvalidCache());
@@ -554,6 +559,42 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
                 .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
     }
 
+    public void dropExternalTable(String dbName, String tableName, String catalogName) throws DdlException {
+        CatalogIf catalog = nameToCatalog.get(catalogName);
+        if (catalog == null) {
+            throw new DdlException("No catalog found with name: " + catalogName);
+        }
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new DdlException("Only support drop ExternalCatalog Tables");
+        }
+        DatabaseIf db = catalog.getDbNullable(dbName);
+        if (db == null) {
+            throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName());
+        }
+
+        TableIf table = db.getTableNullable(tableName);
+        if (table == null) {
+            throw new DdlException("Table " + tableName + " does not exist in db " + dbName);
+        }
+        ExternalObjectLog log = new ExternalObjectLog();
+        log.setCatalogId(catalog.getId());
+        log.setDbId(db.getId());
+        log.setTableId(table.getId());
+        replayDropExternalTable(log);
+        Env.getCurrentEnv().getEditLog().logDropExternalTable(log);
+    }
+
+    public void replayDropExternalTable(ExternalObjectLog log) {
+        LOG.debug("ReplayDropExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}]", log.getCatalogId(), log.getDbId(),
+                log.getTableId());
+        ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
+        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+        ExternalTable table = db.getTableForReplay(log.getTableId());
+        db.dropTable(table.getName());
+        Env.getCurrentEnv().getExtMetaCacheMgr()
+                .invalidateTableCache(catalog.getId(), db.getFullName(), table.getName());
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 2d68d28fee..e730fb6bc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -295,6 +295,9 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
             db.setTableExtCatalog(this);
         }
         objectCreated = false;
+        if (this instanceof HMSExternalCatalog) {
+            ((HMSExternalCatalog) this).setLastSyncedEventId(-1L);
+        }
     }
 
     public void addDatabaseForTest(ExternalDatabase db) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index e2a5f2f3a8..ec94867fc1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -25,14 +25,20 @@ import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.catalog.external.HMSExternalDatabase;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.List;
@@ -42,8 +48,12 @@ import java.util.Map;
  * External catalog for hive metastore compatible data sources.
  */
 public class HMSExternalCatalog extends ExternalCatalog {
+    private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class);
+
     private static final int MAX_CLIENT_POOL_SIZE = 8;
     protected PooledHiveMetaStoreClient client;
+    // Record the latest synced event id when processing hive events
+    private long lastSyncedEventId;
 
     /**
      * Default constructor for HMSExternalCatalog.
@@ -170,4 +180,48 @@ public class HMSExternalCatalog extends ExternalCatalog {
         }
         return tmpSchema;
     }
+
+    public void setLastSyncedEventId(long lastSyncedEventId) {
+        this.lastSyncedEventId = lastSyncedEventId;
+    }
+
+    public NotificationEventResponse getNextEventResponse(HMSExternalCatalog hmsExternalCatalog)
+            throws MetastoreNotificationFetchException {
+        makeSureInitialized();
+        if (lastSyncedEventId < 0) {
+            lastSyncedEventId = getCurrentEventId();
+            refreshCatalog(hmsExternalCatalog);
+            LOG.info(
+                    "First pulling events on catalog [{}],refreshCatalog and init lastSyncedEventId,"
+                            + "lastSyncedEventId is [{}]",
+                    hmsExternalCatalog.getName(), lastSyncedEventId);
+            return null;
+        }
+
+        long currentEventId = getCurrentEventId();
+        LOG.debug("Catalog [{}] getNextEventResponse, currentEventId is {},lastSyncedEventId is {}",
+                hmsExternalCatalog.getName(), currentEventId, lastSyncedEventId);
+        if (currentEventId == lastSyncedEventId) {
+            LOG.info("Event id not updated when pulling events on catalog [{}]", hmsExternalCatalog.getName());
+            return null;
+        }
+        return client.getNextNotification(lastSyncedEventId, Config.hms_events_batch_size_per_rpc, null);
+    }
+
+    private void refreshCatalog(HMSExternalCatalog hmsExternalCatalog) {
+        CatalogLog log = new CatalogLog();
+        log.setCatalogId(hmsExternalCatalog.getId());
+        log.setInvalidCache(true);
+        Env.getCurrentEnv().getCatalogMgr().refreshCatalog(log);
+    }
+
+    private long getCurrentEventId() {
+        makeSureInitialized();
+        CurrentNotificationEventId currentNotificationEventId = client.getCurrentNotificationEventId();
+        if (currentNotificationEventId == null) {
+            LOG.warn("Get currentNotificationEventId is null");
+            return -1;
+        }
+        return currentNotificationEventId.getEventId();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
index 05e3d9d15c..008253c450 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
@@ -19,6 +19,7 @@ package org.apache.doris.datasource;
 
 import org.apache.doris.catalog.HMSResource;
 import org.apache.doris.common.Config;
+import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
 
 import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
 import com.google.common.base.Preconditions;
@@ -29,8 +30,10 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.logging.log4j.LogManager;
@@ -145,6 +148,30 @@ public class PooledHiveMetaStoreClient {
         }
     }
 
+    public CurrentNotificationEventId getCurrentNotificationEventId() {
+        try (CachedClient client = getClient()) {
+            return client.client.getCurrentNotificationEventId();
+        } catch (Exception e) {
+            LOG.warn("Failed to fetch current notification event id", e);
+            throw new MetastoreNotificationFetchException(
+                    "Failed to get current notification event id. msg: " + e.getMessage());
+        }
+    }
+
+    public NotificationEventResponse getNextNotification(long lastEventId,
+            int maxEvents,
+            IMetaStoreClient.NotificationFilter filter)
+            throws MetastoreNotificationFetchException {
+        try (CachedClient client = getClient()) {
+            return client.client.getNextNotification(lastEventId, maxEvents, filter);
+        } catch (Exception e) {
+            LOG.warn("Failed to get next notification based on last event id {}", lastEventId, e);
+            throw new MetastoreNotificationFetchException(
+                    "Failed to get next notification based on last event id: " + lastEventId + ". msg: " + e
+                            .getMessage());
+        }
+    }
+
     private class CachedClient implements AutoCloseable {
         private final IMetaStoreClient client;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
new file mode 100644
index 0000000000..8647e47b78
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONDropTableMessage;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * MetastoreEvent for DROP_TABLE event type
+ */
+public class DropTableEvent extends MetastoreTableEvent {
+    private static final Logger LOG = LogManager.getLogger(DropTableEvent.class);
+    private final String dbName;
+    private final String tableName;
+
+    private DropTableEvent(NotificationEvent event,
+            String catalogName) {
+        super(event, catalogName);
+        Preconditions.checkArgument(MetastoreEventType.DROP_TABLE.equals(getEventType()));
+        JSONDropTableMessage dropTableMessage =
+                (JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer()
+                        .getDropTableMessage(event.getMessage());
+        try {
+            dbName = dropTableMessage.getDB();
+            tableName = dropTableMessage.getTable();
+        } catch (Exception e) {
+            throw new MetastoreNotificationException(debugString(
+                    "Could not parse event message. "
+                            + "Check if %s is set to true in metastore configuration",
+                    MetastoreEventsProcessor.HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY), e);
+        }
+    }
+
+    public static List<MetastoreEvent> getEvents(NotificationEvent event,
+            String catalogName) {
+        return Lists.newArrayList(new DropTableEvent(event, catalogName));
+    }
+
+    @Override
+    protected boolean existInCache() {
+        return true;
+    }
+
+    @Override
+    protected boolean canBeSkipped() {
+        return false;
+    }
+
+    protected boolean isSupported() {
+        return true;
+    }
+
+    @Override
+    protected void process() throws MetastoreNotificationException {
+        try {
+            LOG.info("DropTable event process,catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName,
+                    tableName);
+            Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName, tableName, catalogName);
+        } catch (DdlException e) {
+            LOG.warn("DropExternalTable failed,dbName:[{}],tableName:[{}],catalogName:[{}].", dbName, tableName,
+                    catalogName, e);
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java
new file mode 100644
index 0000000000..333687e2ab
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/EventFactory.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import java.util.List;
+
+/**
+ * Factory interface to generate a {@link MetastoreEvent} from a {@link NotificationEvent} object.
+ */
+public interface EventFactory {
+
+    List<MetastoreEvent> transferNotificationEventToMetastoreEvents(NotificationEvent hmsEvent,
+            String catalogName) throws MetastoreNotificationException;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
new file mode 100644
index 0000000000..4d2dc1a178
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import java.util.List;
+
+/**
+ * An event type which is ignored. Useful for unsupported metastore event types
+ */
+public class IgnoredEvent extends MetastoreEvent {
+    protected IgnoredEvent(NotificationEvent event, String catalogName) {
+        super(event, catalogName);
+    }
+
+    private static List<MetastoreEvent> getEvents(NotificationEvent event,
+            String catalogName) {
+        return Lists.newArrayList(new IgnoredEvent(event, catalogName));
+    }
+
+    @Override
+    public void process() {
+        debugLog("Ignoring unknown event type " + metastoreNotificationEvent.getEventType());
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
new file mode 100644
index 0000000000..5cc4594457
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Abstract base class for all MetastoreEvents. A MetastoreEvent is an object used to
+ * process a NotificationEvent received from metastore.
+ */
+public abstract class MetastoreEvent {
+    private static final Logger LOG = LogManager.getLogger(MetastoreEvent.class);
+    // String.format compatible string to prepend event id and type
+    private static final String STR_FORMAT_EVENT_ID_TYPE = "EventId: %d EventType: %s ";
+
+    // logger format compatible string to prepend to a log formatted message
+    private static final String LOG_FORMAT_EVENT_ID_TYPE = "EventId: {} EventType: {} ";
+
+    // the notification received from metastore which is processed by this
+    protected final NotificationEvent event;
+
+    // dbName from the event
+    protected final String dbName;
+
+    // tblName from the event
+    protected final String tblName;
+
+    // eventId of the event. Used instead of calling getter on event everytime
+    private final long eventId;
+
+    // eventType from the NotificationEvent
+    private final MetastoreEventType eventType;
+
+    // Actual notificationEvent object received from Metastore
+    protected final NotificationEvent metastoreNotificationEvent;
+
+    protected final String catalogName;
+
+    protected MetastoreEvent(NotificationEvent event, String catalogName) {
+        this.event = event;
+        this.dbName = event.getDbName();
+        this.tblName = event.getTableName();
+        this.eventId = event.getEventId();
+        this.eventType = MetastoreEventType.from(event.getEventType());
+        this.metastoreNotificationEvent = event;
+        this.catalogName = catalogName;
+    }
+
+    public long getEventId() {
+        return eventId;
+    }
+
+    public MetastoreEventType getEventType() {
+        return eventType;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public String getTblName() {
+        return tblName;
+    }
+
+    /**
+     * Checks if the given event can be batched into this event. Default behavior is
+     * to return false which can be overridden by a sub-class.
+     * The current version is relatively simple to process batch events, so all that need to be processed are true.
+     *
+     * @param event The event under consideration to be batched into this event.
+     * @return false if event cannot be batched into this event; otherwise true.
+     */
+    protected boolean canBeBatched(MetastoreEvent event) {
+        return false;
+    }
+
+    /**
+     * Adds the given event into the batch of events represented by this event. Default
+     * implementation is to return null. Sub-classes must override this method to
+     * implement batching.
+     *
+     * @param event The event which needs to be added to the batch.
+     * @return The batch event which represents all the events batched into this event
+     * until now including the given event.
+     */
+    protected MetastoreEvent addToBatchEvents(MetastoreEvent event) {
+        return null;
+    }
+
+
+    protected boolean existInCache() throws MetastoreNotificationException {
+        return false;
+    }
+
+    /**
+     * Returns the number of events represented by this event. For most events this is 1.
+     * In case of batch events this could be more than 1.
+     */
+    protected int getNumberOfEvents() {
+        return 1;
+    }
+
+    /**
+     * Certain events like ALTER_TABLE or ALTER_PARTITION implement logic to ignore
+     * some events because they do not affect query results.
+     *
+     * @return true if this event can be skipped.
+     */
+    protected boolean canBeSkipped() {
+        return false;
+    }
+
+    /**
+     * Whether the current version of FE supports processing of some events, some events are reserved,
+     * and may be processed later version.
+     */
+    protected boolean isSupported() {
+        return false;
+    }
+
+    /**
+     * Process the information available in the NotificationEvent.
+     */
+    protected abstract void process() throws MetastoreNotificationException;
+
+    /**
+     * Helper method to get debug string with helpful event information prepended to the
+     * message. This can be used to generate helpful exception messages
+     *
+     * @param msgFormatString String value to be used in String.format() for the given message
+     * @param args args to the <code>String.format()</code> for the given msgFormatString
+     */
+    protected String debugString(String msgFormatString, Object... args) {
+        String formatString = STR_FORMAT_EVENT_ID_TYPE + msgFormatString;
+        Object[] formatArgs = getLogFormatArgs(args);
+        return String.format(formatString, formatArgs);
+    }
+
+    /**
+     * Helper method to generate the format args after prepending the event id and type
+     */
+    private Object[] getLogFormatArgs(Object[] args) {
+        Object[] formatArgs = new Object[args.length + 2];
+        formatArgs[0] = getEventId();
+        formatArgs[1] = getEventType();
+        int i = 2;
+        for (Object arg : args) {
+            formatArgs[i] = arg;
+            i++;
+        }
+        return formatArgs;
+    }
+
+    /**
+     * Logs at info level the given log formatted string and its args. The log formatted
+     * string should have {} pair at the appropriate location in the string for each arg
+     * value provided. This method prepends the event id and event type before logging the
+     * message. No-op if the log level is not at INFO
+     */
+    protected void infoLog(String logFormattedStr, Object... args) {
+        if (!LOG.isInfoEnabled()) {
+            return;
+        }
+        String formatString = LOG_FORMAT_EVENT_ID_TYPE + logFormattedStr;
+        Object[] formatArgs = getLogFormatArgs(args);
+        LOG.info(formatString, formatArgs);
+    }
+
+    /**
+     * Similar to infoLog excepts logs at debug level
+     */
+    protected void debugLog(String logFormattedStr, Object... args) {
+        if (!LOG.isDebugEnabled()) {
+            return;
+        }
+        String formatString = LOG_FORMAT_EVENT_ID_TYPE + logFormattedStr;
+        Object[] formatArgs = getLogFormatArgs(args);
+        LOG.debug(formatString, formatArgs);
+    }
+
+    @Override
+    public String toString() {
+        return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId, eventType);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
new file mode 100644
index 0000000000..2719158c8e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.datasource.HMSExternalCatalog;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Factory class to create various MetastoreEvents.
+ */
+public class MetastoreEventFactory implements EventFactory {
+    private static final Logger LOG = LogManager.getLogger(MetastoreEventFactory.class);
+
+    @Override
+    public List<MetastoreEvent> transferNotificationEventToMetastoreEvents(NotificationEvent event,
+            String catalogName) {
+        Preconditions.checkNotNull(event.getEventType());
+        MetastoreEventType metastoreEventType = MetastoreEventType.from(event.getEventType());
+        switch (metastoreEventType) {
+            case DROP_TABLE:
+                return DropTableEvent.getEvents(event, catalogName);
+            default:
+                // ignore all the unknown events by creating a IgnoredEvent
+                return Lists.newArrayList(new IgnoredEvent(event, catalogName));
+        }
+    }
+
+    List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) {
+        List<MetastoreEvent> metastoreEvents = Lists.newArrayList();
+
+        for (NotificationEvent event : events) {
+            metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event, hmsExternalCatalog.getName()));
+        }
+
+        List<MetastoreEvent> tobeProcessEvents = metastoreEvents.stream()
+                .filter(MetastoreEvent::isSupported)
+                .collect(Collectors.toList());
+
+        if (tobeProcessEvents.isEmpty()) {
+            LOG.info("The metastore events to process is empty on catalog {}", hmsExternalCatalog.getName());
+            return Collections.emptyList();
+        }
+
+        return createBatchEvents(tobeProcessEvents);
+    }
+
+    /**
+     * Create batch event tasks according to HivePartitionName to facilitate subsequent parallel processing.
+     * For ADD_PARTITION and DROP_PARTITION, we directly override any events before that partition.
+     * For a partition, it is meaningless to process any events before the drop partition.
+     */
+    List<MetastoreEvent> createBatchEvents(List<MetastoreEvent> events) {
+        // now do nothing
+        return events;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java
new file mode 100644
index 0000000000..31dce29366
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventType.java
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+/**
+ * Currently we only support handling some events.
+ */
+public enum MetastoreEventType {
+    CREATE_TABLE("CREATE_TABLE"),
+    DROP_TABLE("DROP_TABLE"),
+    ALTER_TABLE("ALTER_TABLE"),
+    CREATE_DATABASE("CREATE_DATABASE"),
+    DROP_DATABASE("DROP_DATABASE"),
+    ALTER_DATABASE("ALTER_DATABASE"),
+    ADD_PARTITION("ADD_PARTITION"),
+    ALTER_PARTITION("ALTER_PARTITION"),
+    ALTER_PARTITIONS("ALTER_PARTITIONS"),
+    DROP_PARTITION("DROP_PARTITION"),
+    INSERT("INSERT"),
+    INSERT_PARTITIONS("INSERT_PARTITIONS"),
+    ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"),
+    COMMIT_TXN("COMMIT_TXN"),
+    ABORT_TXN("ABORT_TXN"),
+    OTHER("OTHER");
+
+    private final String eventType;
+
+    MetastoreEventType(String msEventType) {
+        this.eventType = msEventType;
+    }
+
+    @Override
+    public String toString() {
+        return eventType;
+    }
+
+    /**
+     * Returns the MetastoreEventType from a given string value of event from Metastore's
+     * NotificationEvent.eventType. If none of the supported MetastoreEventTypes match,
+     * return OTHER
+     *
+     * @param eventType EventType value from the {@link org.apache.hadoop.hive.metastore.api.NotificationEvent}
+     */
+    public static MetastoreEventType from(String eventType) {
+        for (MetastoreEventType metastoreEventType : values()) {
+            if (metastoreEventType.eventType.equalsIgnoreCase(eventType)) {
+                return metastoreEventType;
+            }
+        }
+        return OTHER;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
new file mode 100644
index 0000000000..1ff3bd98b2
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.HMSExternalCatalog;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A metastore event is a instance of the class
+ * {@link NotificationEvent}. Metastore can be
+ * configured, to work with Listeners which are called on various DDL operations like
+ * create/alter/drop operations on database, table, partition etc. Each event has a unique
+ * incremental id and the generated events are be fetched from Metastore to get
+ * incremental updates to the metadata stored in Hive metastore using the the public API
+ * <code>get_next_notification</code> These events could be generated by external
+ * Metastore clients like Apache Hive or Apache Spark configured to talk with the same metastore.
+ * <p>
+ * This class is used to poll metastore for such events at a given frequency. By observing
+ * such events, we can take appropriate action on the {@link org.apache.doris.datasource.hive.HiveMetaStoreCache}
+ * (refresh/invalidate/add/remove) so that represents the latest information
+ * available in metastore. We keep track of the last synced event id in each polling
+ * iteration so the next batch can be requested appropriately. The current batch size is
+ * constant and set to {@link org.apache.doris.common.Config#hms_events_batch_size_per_rpc}.
+ */
+public class MetastoreEventsProcessor extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(MetastoreEventsProcessor.class);
+    public static final String HMS_ADD_THRIFT_OBJECTS_IN_EVENTS_CONFIG_KEY =
+            "hive.metastore.notifications.add.thrift.objects";
+
+    // for deserializing from JSON strings from metastore event
+    private static final MessageDeserializer MESSAGE_DESERIALIZER = new JSONMessageDeserializer();
+
+
+    // event factory which is used to get or create MetastoreEvents
+    private final MetastoreEventFactory metastoreEventFactory;
+
+    private boolean isRunning;
+
+    public MetastoreEventsProcessor() {
+        super(MetastoreEventsProcessor.class.getName(), Config.hms_events_polling_interval_ms);
+        this.metastoreEventFactory = new MetastoreEventFactory();
+        this.isRunning = false;
+    }
+
+    /**
+     * Fetch the next batch of NotificationEvents from metastore. The default batch size is
+     * <code>{@link Config#hms_events_batch_size_per_rpc}</code>
+     */
+    private List<NotificationEvent> getNextHMSEvents(HMSExternalCatalog hmsExternalCatalog) {
+        LOG.debug("Start to pull events on catalog [{}]", hmsExternalCatalog.getName());
+        NotificationEventResponse response = hmsExternalCatalog.getNextEventResponse(hmsExternalCatalog);
+
+        if (response == null) {
+            return Collections.emptyList();
+        }
+        return response.getEvents();
+    }
+
+    private void doExecute(List<MetastoreEvent> events, HMSExternalCatalog hmsExternalCatalog) {
+        for (MetastoreEvent event : events) {
+            try {
+                event.process();
+            } catch (Exception e) {
+                hmsExternalCatalog.setLastSyncedEventId(event.getEventId() - 1);
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * Process the given list of notification events. Useful for tests which provide a list of events
+     */
+    private void processEvents(List<NotificationEvent> events, HMSExternalCatalog hmsExternalCatalog) {
+        //transfer
+        List<MetastoreEvent> metastoreEvents = metastoreEventFactory.getMetastoreEvents(events, hmsExternalCatalog);
+        doExecute(metastoreEvents, hmsExternalCatalog);
+        hmsExternalCatalog.setLastSyncedEventId(events.get(events.size() - 1).getEventId());
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        if (isRunning) {
+            LOG.warn("Last task not finished,ignore current task.");
+            return;
+        }
+        isRunning = true;
+        try {
+            realRun();
+        } catch (Exception ex) {
+            LOG.warn("Task failed", ex);
+        }
+        isRunning = false;
+    }
+
+    private void realRun() {
+        List<Long> catalogIds = Env.getCurrentEnv().getCatalogMgr().getCatalogIds();
+        for (Long catalogId : catalogIds) {
+            CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
+            if (catalog instanceof HMSExternalCatalog) {
+                HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog) catalog;
+                List<NotificationEvent> events = Collections.emptyList();
+                try {
+                    events = getNextHMSEvents(hmsExternalCatalog);
+                    if (!events.isEmpty()) {
+                        LOG.info("Events size are {} on catalog [{}]", events.size(),
+                                hmsExternalCatalog.getName());
+                        processEvents(events, hmsExternalCatalog);
+                    }
+                } catch (MetastoreNotificationFetchException e) {
+                    LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e);
+                } catch (Exception ex) {
+                    LOG.warn("Failed to process hive metastore [{}] events .",
+                            hmsExternalCatalog.getName(), ex);
+                }
+            }
+        }
+    }
+
+    public static MessageDeserializer getMessageDeserializer() {
+        return MESSAGE_DESERIALIZER;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java
new file mode 100644
index 0000000000..2bd5c4c40c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationException.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+/**
+ * Utility exception class to be thrown for errors during event processing
+ */
+public class MetastoreNotificationException extends RuntimeException {
+
+    public MetastoreNotificationException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+
+    public MetastoreNotificationException(String msg) {
+        super(msg);
+    }
+
+    public MetastoreNotificationException(Exception e) {
+        super(e);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java
new file mode 100644
index 0000000000..487165eeca
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreNotificationFetchException.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+/**
+ * Utility exception class to be thrown for errors during event processing
+ */
+public class MetastoreNotificationFetchException extends MetastoreNotificationException {
+
+    public MetastoreNotificationFetchException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+
+    public MetastoreNotificationFetchException(String msg) {
+        super(msg);
+    }
+
+    public MetastoreNotificationFetchException(Exception e) {
+        super(e);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
new file mode 100644
index 0000000000..70f56bdbb0
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.datasource.hive.event;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+import java.util.List;
+
+/**
+ * Base class for all the table events
+ */
+public abstract class MetastoreTableEvent extends MetastoreEvent {
+
+
+    protected MetastoreTableEvent(NotificationEvent event, String catalogName) {
+        super(event, catalogName);
+        Preconditions.checkNotNull(dbName, "Database name cannot be null");
+        Preconditions.checkNotNull(tblName, "Table name cannot be null");
+    }
+
+    /**
+     * Returns a list of parameters that are set by Hive for tables/partitions that can be
+     * ignored to determine if the alter table/partition event is a trivial one.
+     */
+    private static final List<String> PARAMETERS_TO_IGNORE =
+            new ImmutableList.Builder<String>()
+                    .add("transient_lastDdlTime")
+                    .add("numFilesErasureCoded")
+                    .add("numFiles")
+                    .add("comment")
+                    .build();
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 96ae663015..fb8fbb5b99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -706,6 +706,7 @@ public class JournalEntity implements Writable {
                 break;
             }
             case OperationType.OP_REFRESH_EXTERNAL_DB:
+            case OperationType.OP_DROP_EXTERNAL_TABLE:
             case OperationType.OP_REFRESH_EXTERNAL_TABLE: {
                 data = ExternalObjectLog.read(in);
                 isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 217c0c117a..9254583294 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -949,6 +949,11 @@ public class EditLog {
                     env.getCatalogMgr().replayRefreshExternalTable(log);
                     break;
                 }
+                case OperationType.OP_DROP_EXTERNAL_TABLE: {
+                    final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
+                    env.getCatalogMgr().replayDropExternalTable(log);
+                    break;
+                }
                 case OperationType.OP_INIT_EXTERNAL_TABLE: {
                     // Do nothing.
                     break;
@@ -1624,6 +1629,10 @@ public class EditLog {
         logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log);
     }
 
+    public void logDropExternalTable(ExternalObjectLog log) {
+        logEdit(OperationType.OP_DROP_EXTERNAL_TABLE, log);
+    }
+
     public Journal getJournal() {
         return this.journal;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 6204fc1836..c5889a8001 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -255,6 +255,8 @@ public class OperationType {
     public static final short OP_DROP_MTMV_TASK = 341;
     public static final short OP_ALTER_MTMV_TASK = 342;
 
+    public static final short OP_DROP_EXTERNAL_TABLE = 350;
+
     public static final short OP_ALTER_USER = 400;
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org