You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by vi...@apache.org on 2021/07/22 17:33:13 UTC

[impala] branch master updated: IMPALA-10815: Ignore events on non-default hive catalogs

This is an automated email from the ASF dual-hosted git repository.

vihangk1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new a7a1cb7  IMPALA-10815: Ignore events on non-default hive catalogs
a7a1cb7 is described below

commit a7a1cb7106699c1d7f852c4d5782e16a7fdba169
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Tue Jul 20 11:45:09 2021 -0700

    IMPALA-10815: Ignore events on non-default hive catalogs
    
    Hive-3 supports a new type in metastore called catalogs. Even
    though impala does not support custom catalogs, it is still possible
    that some external HMS client creates objects within a non-default
    catalog. This can become problematic when the objects within
    the custom catalog match with the name of other objects in the default
    catalog. For example, dropping a custom catalog generates a
    DROP_DATABASE event on default database of that catalog. When such
    event is processed, the events processor can remove the default
    database.
    
    This patch adds logic to ignore all the events which are
    generated on such non-default catalog objects. The default value of
    catalog is defined in the hive-site.xml of the metastore client which
    is used by catalogd. If the value is not present it default to "hive".
    
    Additionally, it also adds the code to validate that the default
    catalog name defined in the hive-site.xml of the catalogd is same
    as in metastore server side. If the values do not match, the
    events processor does not come up.
    
    This patch also removes some config validations which are specific
    to hive-2 since we don't support hive-2 anymore in master.
    
    Testing:
    1. Added a new test which creates a custom hive catalog and events
    on it. The test makes sure that such events do not affect objects in
    catalogd.
    
    Change-Id: Ided463b2a98331d3d305bbe92fbf3a5d2e197acf
    Reviewed-on: http://gerrit.cloudera.org:8080/17707
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/compat/MetastoreShim.java    |  10 ++
 .../events/MetastoreEventProcessorConfig.java      |   7 +-
 .../impala/catalog/events/MetastoreEvents.java     |  24 ++-
 .../catalog/events/MetastoreEventsProcessor.java   |  25 ++-
 .../events/MetastoreEventsProcessorTest.java       | 173 +++++++++++++--------
 5 files changed, 150 insertions(+), 89 deletions(-)

diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 523ca21..86a0c00 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -396,6 +396,16 @@ public class MetastoreShim {
     }
   }
 
+  // hive-3 introduces a catalog object in hive
+  // Impala only supports the default catalog of hive
+  private static final String defaultCatalogName_ = MetaStoreUtils
+      .getDefaultCatalog(MetastoreConf.newMetastoreConf());
+
+  /**
+   * Gets the name of the default catalog from metastore configuration.
+   */
+  public static String getDefaultCatalogName() { return defaultCatalogName_; }
+
   //hive-3 has a different class to encode and decode event messages
   private static final MessageEncoder eventMessageEncoder_ =
       MessageFactory.getDefaultInstance(MetastoreConf.newMetastoreConf());
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventProcessorConfig.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventProcessorConfig.java
index c846d6b..f0b7565 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventProcessorConfig.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventProcessorConfig.java
@@ -19,6 +19,7 @@ package org.apache.impala.catalog.events;
 
 import com.google.common.base.Preconditions;
 import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
+import org.apache.impala.compat.MetastoreShim;
 
 /**
  * Metastore configurations and their expected values for event processing.
@@ -28,11 +29,9 @@ import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
  * validate depending on the hive version.
  */
 public enum MetastoreEventProcessorConfig {
-  ADD_THRIFT_OBJECTS("hive.metastore.notifications.add.thrift.objects", "true"),
-  ALTER_NOTIFICATIONS_BASIC("hive.metastore.alter.notifications.basic", "false"),
   FIRE_EVENTS_FOR_DML("hive.metastore.dml.events", "true"),
-  METASTORE_PARAMETER_EXCLUDE_PATTERNS(new EventPropertyRegexValidator(
-      "hive.metastore.notification.parameters.exclude.patterns"));
+  METASTORE_DEFAULT_CATALOG_NAME("metastore.catalog.default",
+      MetastoreShim.getDefaultCatalogName());
 
   private final ConfigValidator validator_;
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 90244f7..c0195a6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -55,6 +55,7 @@ import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.TableNotLoadedException;
 import org.apache.impala.common.Metrics;
 import org.apache.impala.common.Reference;
+import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TTableName;
@@ -222,11 +223,21 @@ public class MetastoreEvents {
       int i = 0;
       while (i < metastoreEvents.size()) {
         MetastoreEvent currentEvent = metastoreEvents.get(i);
+        String catalogName = currentEvent.getCatalogName();
         String eventDb = currentEvent.getDbName();
         String eventTbl = currentEvent.getTableName();
-        // if the event is on blacklisted db or table we should filter it out
-        if ((eventDb != null && catalog_.isBlacklistedDb(eventDb)) || (eventTbl != null
-            && catalog_.isBlacklistedTable(eventDb, eventTbl))) {
+        if (catalogName != null && !MetastoreShim.getDefaultCatalogName()
+            .equalsIgnoreCase(catalogName)) {
+          // currently Impala doesn't support custom hive catalogs and hence we should
+          // ignore all the events which are on non-default catalog namespaces.
+          LOG.debug(currentEvent.debugString(
+              "Filtering out this event since it is on a non-default hive catalog %s",
+              catalogName));
+          metastoreEvents.remove(i);
+          numFilteredEvents++;
+        } else if ((eventDb != null && catalog_.isBlacklistedDb(eventDb)) || (
+            eventTbl != null && catalog_.isBlacklistedTable(eventDb, eventTbl))) {
+          // if the event is on blacklisted db or table we should filter it out
           String blacklistedObject = eventTbl != null ? new TableName(eventDb,
               eventTbl).toString() : eventDb;
           LOG.info(currentEvent.debugString("Filtering out this event since it is on a "
@@ -272,6 +283,9 @@ public class MetastoreEvents {
     // Logger available for all the sub-classes
     protected final Logger LOG = LoggerFactory.getLogger(this.getClass());
 
+    // catalog name from the event
+    protected final String catalogName_;
+
     // dbName from the event
     protected final String dbName_;
 
@@ -290,7 +304,6 @@ public class MetastoreEvents {
     // metrics registry so that events can add metrics
     protected final Metrics metrics_;
 
-
     MetastoreEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
         NotificationEvent event) {
       this.catalogOpExecutor_ = catalogOpExecutor;
@@ -299,6 +312,7 @@ public class MetastoreEvents {
       this.eventId_ = event_.getEventId();
       this.eventType_ = MetastoreEventType.from(event.getEventType());
       // certain event types in Hive-3 like COMMIT_TXN may not have dbName set
+      this.catalogName_ = event.getCatName();
       this.dbName_ = event.getDbName();
       this.tblName_ = event.getTableName();
       this.metastoreNotificationEvent_ = event;
@@ -307,6 +321,8 @@ public class MetastoreEvents {
 
     public long getEventId() { return eventId_; }
 
+    public String getCatalogName() { return catalogName_; }
+
     public String getDbName() { return dbName_; }
 
     public String getTableName() { return tblName_; }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index e7061a2..1823898 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -17,7 +17,6 @@
 
 package org.apache.impala.catalog.events;
 
-
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
@@ -373,16 +372,16 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
         LOG.error(msg, e);
         throw new CatalogException(msg);
       }
-      if (!validationErrors.isEmpty()) {
-        LOG.error("Found {} incorrect metastore configuration(s).",
-            validationErrors.size());
-        for (ValidationResult invalidConfig: validationErrors) {
-          LOG.error(invalidConfig.getReason());
-        }
-        throw new CatalogException(String.format("Found %d incorrect metastore "
-            + "configuration(s). Events processor cannot start. See ERROR log for more "
-            + "details.", validationErrors.size()));
+    }
+    if (!validationErrors.isEmpty()) {
+      LOG.error("Found {} incorrect metastore configuration(s).",
+          validationErrors.size());
+      for (ValidationResult invalidConfig: validationErrors) {
+        LOG.error(invalidConfig.getReason());
       }
+      throw new CatalogException(String.format("Found %d incorrect metastore "
+          + "configuration(s). Events processor cannot start. See ERROR log for more "
+          + "details.", validationErrors.size()));
     }
   }
 
@@ -393,10 +392,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
    * version
    */
   public static List<MetastoreEventProcessorConfig> getEventProcessorConfigsToValidate() {
-    if (MetastoreShim.getMajorVersion() >= 2) {
-      return Arrays.asList(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML);
-    }
-    return Arrays.asList(MetastoreEventProcessorConfig.values());
+    return Arrays.asList(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML,
+        MetastoreEventProcessorConfig.METASTORE_DEFAULT_CATALOG_NAME);
   }
 
   private void initMetrics() {
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 7fd3815..5dfdc1b 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.impala.catalog.events;
 
-import static java.lang.Thread.sleep;
-import static org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.ALTER_TABLE;
-import static org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.CREATE_DATABASE;
-import static org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.DROP_DATABASE;
-import static org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType.DROP_TABLE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -31,6 +26,7 @@ import static org.junit.Assert.fail;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -38,7 +34,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -51,8 +46,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.metastore.api.Catalog;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -88,7 +85,6 @@ import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
 import org.apache.impala.catalog.events.MetastoreEvents.AlterTableEvent;
-import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
@@ -155,6 +151,7 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -221,6 +218,13 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  private static void dropDatabaseCascade(String catName, String dbName)
+      throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      msClient.getHiveClient().dropDatabase(catName, dbName, true, true, true);
+    }
+  }
+
   /**
    * Cleans up the test database from both metastore and catalog
    * @throws TException
@@ -273,6 +277,10 @@ public class MetastoreEventsProcessorTest {
       String configKey = config.getValidator().getConfigKey();
       Mockito.when(mockMetastoreEventsProcessor.getConfigValueFromMetastore(configKey,
           "")).thenReturn("false");
+      if (config.equals(MetastoreEventProcessorConfig.METASTORE_DEFAULT_CATALOG_NAME)) {
+        Mockito.when(mockMetastoreEventsProcessor.getConfigValueFromMetastore(configKey,
+            "")).thenReturn("test_custom_catalog");
+      }
     }
     try {
       mockMetastoreEventsProcessor.validateConfigs();
@@ -311,55 +319,20 @@ public class MetastoreEventsProcessorTest {
   @Test
   public void testConfigValidationWithIncorrectValues() {
     Map<MetastoreEventProcessorConfig, String> incorrectValues = new HashMap<>();
-    incorrectValues.put(MetastoreEventProcessorConfig.ADD_THRIFT_OBJECTS, "false");
-    incorrectValues.put(MetastoreEventProcessorConfig.ALTER_NOTIFICATIONS_BASIC, "true");
     incorrectValues.put(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML, "false");
+    incorrectValues
+        .put(MetastoreEventProcessorConfig.METASTORE_DEFAULT_CATALOG_NAME, "custom");
     for (MetastoreEventProcessorConfig config : incorrectValues.keySet()) {
       testConfigValidation(config, incorrectValues.get(config), false);
     }
-    testConfigValidation(
-        MetastoreEventProcessorConfig.METASTORE_PARAMETER_EXCLUDE_PATTERNS, "^impala",
-        false);
-    testConfigValidation(
-        MetastoreEventProcessorConfig.METASTORE_PARAMETER_EXCLUDE_PATTERNS, "impala*",
-        false);
-    testConfigValidation(
-        MetastoreEventProcessorConfig.METASTORE_PARAMETER_EXCLUDE_PATTERNS,
-        "randomString1, impala.disableHmsSync, randomString2", false);
-    testConfigValidation(
-        MetastoreEventProcessorConfig.METASTORE_PARAMETER_EXCLUDE_PATTERNS,
-        MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), false);
-    testConfigValidation(
-        MetastoreEventProcessorConfig.METASTORE_PARAMETER_EXCLUDE_PATTERNS,
-        "^impala.events.catalogServiceId", false);
-    testConfigValidation(
-        MetastoreEventProcessorConfig.METASTORE_PARAMETER_EXCLUDE_PATTERNS, ".*", false);
-    testConfigValidation(
-        MetastoreEventProcessorConfig.METASTORE_PARAMETER_EXCLUDE_PATTERNS, ".+", false);
-    testConfigValidation(
-        MetastoreEventProcessorConfig.METASTORE_PARAMETER_EXCLUDE_PATTERNS, ".*disable.*",
-        false);
-
-    // check validation succeeds for correct values
-    testConfigValidation(MetastoreEventProcessorConfig.ADD_THRIFT_OBJECTS, "true", true);
-    testConfigValidation(MetastoreEventProcessorConfig.ADD_THRIFT_OBJECTS, "TRUE", true);
-    testConfigValidation(MetastoreEventProcessorConfig.ADD_THRIFT_OBJECTS, "True", true);
-
-    testConfigValidation(MetastoreEventProcessorConfig.ALTER_NOTIFICATIONS_BASIC, "false",
-        true);
-    testConfigValidation(MetastoreEventProcessorConfig.ALTER_NOTIFICATIONS_BASIC, "FALSE",
-        true);
-    testConfigValidation(MetastoreEventProcessorConfig.ALTER_NOTIFICATIONS_BASIC, "fAlse",
-        true);
-
-    testConfigValidation(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML, "true", true);
-    testConfigValidation(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML, "TRUE", true);
-    testConfigValidation(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML, "tRue", true);
-    testConfigValidation(
-        MetastoreEventProcessorConfig.METASTORE_PARAMETER_EXCLUDE_PATTERNS, "", true);
-    testConfigValidation(
-        MetastoreEventProcessorConfig.METASTORE_PARAMETER_EXCLUDE_PATTERNS, "random",
-        true);
+    testConfigValidation(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML,
+        "true", true);
+    testConfigValidation(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML,
+        "TRUE", true);
+    testConfigValidation(MetastoreEventProcessorConfig.FIRE_EVENTS_FOR_DML,
+        "tRue", true);
+    testConfigValidation(MetastoreEventProcessorConfig.METASTORE_DEFAULT_CATALOG_NAME,
+        "hIve", true);
   }
 
   private void testConfigValidation(MetastoreEventProcessorConfig config,
@@ -384,6 +357,39 @@ public class MetastoreEventsProcessorTest {
     assertNotNull(catalog_.getDb(TEST_DB_NAME));
   }
 
+  @Test
+  public void testIgnoreNonDefaultCatalogs() throws Exception {
+    String catName = "custom_hive_catalog";
+    try {
+      dropHiveCatalogIfExists(catName);
+      createHiveCatalog(catName);
+      createDatabase(TEST_DB_NAME, null);
+      String tblName = "test";
+      createTable(tblName, false);
+      eventsProcessor_.processEvents();
+      // create a database and table in the custom hive catalog whose name matches
+      // with one already existing in Impala
+      createDatabase(catName, TEST_DB_NAME, null);
+      createTable(catName, TEST_DB_NAME, tblName, null, false);
+      eventsProcessor_.processEvents();
+      assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+      // assert that dbname and table in the default catalog exist
+      assertNotNull(catalog_.getDb(TEST_DB_NAME));
+      assertNotNull(catalog_.getTable(TEST_DB_NAME, tblName));
+      dropDatabaseCascade(catName, TEST_DB_NAME);
+      // when a catalog is created a default database is also created within it
+      dropDatabaseCascade(catName, "default");
+      dropHiveCatalogIfExists(catName);
+      eventsProcessor_.processEvents();
+      assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+      // assert that dbname and table in the default catalog exist
+      assertNotNull(catalog_.getDb(TEST_DB_NAME));
+      assertNotNull(catalog_.getTable(TEST_DB_NAME, tblName));
+    } finally {
+      dropHiveCatalogIfExists(catName);
+    }
+  }
+
   /**
    * Checks that Db object does not exist after processing DROP_DATABASE event when the
    * dropped database is empty
@@ -706,12 +712,12 @@ public class MetastoreEventsProcessorTest {
     // Test insert into partition
     createDatabase(TEST_DB_NAME, null);
     String tableToInsertPart = "tbl_to_insert_part";
-    createTable(TEST_DB_NAME, tableToInsertPart, null, true);
+    createTable(TEST_DB_NAME, tableToInsertPart, true);
     testInsertEvents(TEST_DB_NAME, tableToInsertPart, true);
 
     // Test insert into table
     String tableToInsertNoPart = "tbl_to_insert_no_part";
-    createTable(TEST_DB_NAME, tableToInsertNoPart, null, false);
+    createTable(TEST_DB_NAME, tableToInsertNoPart, false);
     testInsertEvents(TEST_DB_NAME, tableToInsertNoPart,false);
   }
 
@@ -1554,7 +1560,7 @@ public class MetastoreEventsProcessorTest {
               tblTransition.first);
         }
         createDatabase(TEST_DB_NAME, dbParams);
-        createTable(TEST_DB_NAME, testTblName, tblParams, false);
+        createTable(null, TEST_DB_NAME, testTblName, tblParams, false);
         eventsProcessor_.processEvents();
         // table creation is skipped since the flag says so
         assertNull(catalog_.getTable(TEST_DB_NAME, testTblName));
@@ -1682,9 +1688,9 @@ public class MetastoreEventsProcessorTest {
     }
 
     org.apache.hadoop.hive.metastore.api.Table tableBefore =
-        getTestTable(dbName, tblName, beforeParams, false);
+        getTestTable(null, dbName, tblName, beforeParams, false);
     org.apache.hadoop.hive.metastore.api.Table tableAfter =
-        getTestTable(dbName, tblName, afterParams, false);
+        getTestTable(null, dbName, tblName, afterParams, false);
 
     Map<String, String> dbParams = new HashMap<>(1);
     if (dbFlag != null) {
@@ -1708,7 +1714,7 @@ public class MetastoreEventsProcessorTest {
     // issue a dummy alter table by adding a param
     afterParams.put("dummy", "value");
     org.apache.hadoop.hive.metastore.api.Table nextTable =
-        getTestTable(dbName, tblName, afterParams, false);
+        getTestTable(null, dbName, tblName, afterParams, false);
     NotificationEvent nextNotification =
         createFakeAlterTableNotification(dbName, tblName, tableAfter, nextTable);
     alterTableEvent =
@@ -1769,9 +1775,9 @@ public class MetastoreEventsProcessorTest {
     Map<String, String> tblParams = new HashMap<>(1);
     tblParams.put(MetastoreEventPropertyKey.DISABLE_EVENT_HMS_SYNC.getKey(), "true");
     // event 2
-    createTable(TEST_DB_NAME, "tbl_should_skipped", tblParams, true);
+    createTable(null, TEST_DB_NAME, "tbl_should_skipped", tblParams, true);
     // event 3
-    createTable(TEST_DB_NAME, testTblName, null, true);
+    createTable(null, TEST_DB_NAME, testTblName, null, true);
     List<List<String>> partitionVals = new ArrayList<>();
     partitionVals.add(Arrays.asList("1"));
     partitionVals.add(Arrays.asList("2"));
@@ -1929,7 +1935,7 @@ public class MetastoreEventsProcessorTest {
       Map<String, String> dbParams, Map<String, String> tblParams) throws Exception {
     assertNull(catalog_.getDb(dbName));
     createDatabase(dbName, dbParams);
-    createTable(dbName, tblName, tblParams, true);
+    createTable(null, dbName, tblName, tblParams, true);
     List<List<String>> partVals = new ArrayList<>(3);
     partVals.add(Arrays.asList("1"));
     partVals.add(Arrays.asList("2"));
@@ -2417,7 +2423,13 @@ public class MetastoreEventsProcessorTest {
 
   private void createDatabase(String dbName, Map<String, String> params)
       throws TException {
+    createDatabase(null, dbName, params);
+  }
+
+  private void createDatabase(String catName, String dbName, Map<String, String> params)
+      throws TException {
     Database database = new Database();
+    if (catName != null) database.setCatalogName(catName);
     database.setName(dbName);
     database.setDescription("Notification test database");
     database.setOwnerName("NotificationOwner");
@@ -2430,6 +2442,27 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  private void createHiveCatalog(String catName) throws TException {
+    Catalog catalog = new Catalog();
+    catalog.setName(catName);
+    catalog.setLocationUri(Files.createTempDir().getAbsolutePath());
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      metaStoreClient.getHiveClient().createCatalog(catalog);
+    }
+  }
+
+  private void dropHiveCatalogIfExists(String catName) throws TException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      // unfortunately the dropCatalog API doesn't seem to take ifExists or cascade flag
+      if (msClient.getHiveClient().getCatalogs().contains(catName)) {
+        for (String db : msClient.getHiveClient().getAllDatabases(catName)) {
+          msClient.getHiveClient().dropDatabase(catName, db, true, true, true);
+        }
+        msClient.getHiveClient().dropCatalog(catName);
+      }
+    }
+  }
+
   private void addDatabaseParameters(String key, String val) throws TException {
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       Database msDb = msClient.getHiveClient().getDatabase(TEST_DB_NAME);
@@ -2447,21 +2480,22 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
-  private void createTable(String dbName, String tblName, Map<String, String> params,
-      boolean isPartitioned) throws TException {
+  private void createTable(String catName, String dbName, String tblName,
+      Map<String, String> params, boolean isPartitioned) throws TException {
     org.apache.hadoop.hive.metastore.api.Table
-        tbl = getTestTable(dbName, tblName, params, isPartitioned);
+        tbl = getTestTable(catName, dbName, tblName, params, isPartitioned);
 
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       msClient.getHiveClient().createTable(tbl);
     }
   }
 
-  private org.apache.hadoop.hive.metastore.api.Table getTestTable(String dbName,
-      String tblName, Map<String, String> params, boolean isPartitioned)
+  private org.apache.hadoop.hive.metastore.api.Table getTestTable(String catName,
+      String dbName, String tblName, Map<String, String> params, boolean isPartitioned)
       throws MetaException {
     org.apache.hadoop.hive.metastore.api.Table tbl =
         new org.apache.hadoop.hive.metastore.api.Table();
+    if (catName != null) tbl.setCatName(catName);
     tbl.setDbName(dbName);
     tbl.setTableName(tblName);
     tbl.putToParameters("tblParamKey", "tblParamValue");
@@ -2973,8 +3007,13 @@ public class MetastoreEventsProcessorTest {
     return partitionDef;
   }
 
+  private void createTable(String dbName, String tblName, boolean isPartitioned)
+      throws TException {
+    createTable(null, dbName, tblName, null, isPartitioned);
+  }
+
   private void createTable(String tblName, boolean isPartitioned) throws TException {
-    createTable(TEST_DB_NAME, tblName, null, isPartitioned);
+    createTable(null, TEST_DB_NAME, tblName, null, isPartitioned);
   }
 
   private void dropTable(String tableName) throws TException {