You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2023/08/29 06:00:29 UTC

[hive] branch master updated: Revert "HIVE-23680 : TestDbNotificationListener is unstable (Kirti Ruge, reviewed by Zsolt Miskolczi, Laszlo Vegh)"

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new bfd4362ada8 Revert "HIVE-23680 : TestDbNotificationListener is unstable (Kirti Ruge, reviewed by Zsolt Miskolczi, Laszlo Vegh)"
bfd4362ada8 is described below

commit bfd4362ada8ce7cf7bcbb3e77d7fbcc54d2308c9
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Tue Aug 29 11:20:20 2023 +0530

    Revert "HIVE-23680 : TestDbNotificationListener is unstable (Kirti Ruge, reviewed by Zsolt Miskolczi, Laszlo Vegh)"
    
    This reverts commit a02fe662ba1548dc5a80041d0649e131ca1be789.
---
 itests/hcatalog-unit/pom.xml                       |   1 -
 .../listener/TestDbNotificationCleanup.java        | 189 ------------------
 .../listener/TestDbNotificationListener.java       | 215 ++++++++++++++++++---
 .../TestTransactionalDbNotificationListener.java   | 169 ----------------
 4 files changed, 190 insertions(+), 384 deletions(-)

diff --git a/itests/hcatalog-unit/pom.xml b/itests/hcatalog-unit/pom.xml
index 4a049697b71..5183a2ba802 100644
--- a/itests/hcatalog-unit/pom.xml
+++ b/itests/hcatalog-unit/pom.xml
@@ -89,7 +89,6 @@
     <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-standalone-metastore-server</artifactId>
-      <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationCleanup.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationCleanup.java
deleted file mode 100644
index e73a57ea58f..00000000000
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationCleanup.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.listener;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import java.util.concurrent.TimeUnit;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.hive.cli.CliSessionState;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
-import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
-import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL;
-
-
-
-public class TestDbNotificationCleanup {
-    private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationCleanup.class
-            .getName());
-    private static final int EVENTS_TTL = 30;
-    private static final int CLEANUP_SLEEP_TIME = 10;
-    private static Map<String, String> emptyParameters = new HashMap<String, String>();
-    private static IMetaStoreClient msClient;
-    private static IDriver driver;
-    private static MessageDeserializer md;
-    private static HiveConf conf;
-
-    private long firstEventId;
-    private final String testTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "testDbNotif").toString();
-    @SuppressWarnings("rawtypes")
-    @BeforeClass
-    public static void connectToMetastore() throws Exception {
-        conf = new HiveConf();
-
-        MetastoreConf.setVar(conf,MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS,
-                "org.apache.hive.hcatalog.listener.DbNotificationListener");
-        conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-        conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
-        conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, DummyRawStoreFailEvent.class.getName());
-        MetastoreConf.setVar(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY, JSONMessageEncoder.class.getName());
-        MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, CLEANUP_SLEEP_TIME, TimeUnit.SECONDS);
-        MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, EVENTS_TTL, TimeUnit.SECONDS);
-        MetastoreConf.setTimeVar(conf, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, 20, TimeUnit.SECONDS);
-        conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-                "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
-        SessionState.start(new CliSessionState(conf));
-        msClient = new HiveMetaStoreClient(conf);
-        driver = DriverFactory.newDriver(conf);
-
-    }
-
-    @Before
-    public void setup() throws Exception {
-        firstEventId = msClient.getCurrentNotificationEventId().getEventId();
-        DummyRawStoreFailEvent.setEventSucceed(true);
-    }
-
-    @AfterClass
-    public static void tearDownAfterClass() {
-
-        if (msClient != null) {
-            msClient.close();
-        }
-        if (driver != null) {
-            driver.close();
-        }
-        conf = null;
-    }
-
-
-    @Test
-    public void cleanupNotifs() throws Exception {
-        Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
-        msClient.createDatabase(db);
-        msClient.dropDatabase("cleanup1");
-
-        LOG.info("Pulling events immediately after createDatabase/dropDatabase");
-        NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-        assertEquals(2, rsp.getEventsSize());
-
-        // sleep for expiry time, and then fetch again
-        // sleep twice the TTL interval - things should have been cleaned by then.
-        Thread.sleep(EVENTS_TTL * 2 * 1000);
-
-        LOG.info("Pulling events again after cleanup");
-        NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null);
-        LOG.info("second trigger done");
-        assertEquals(0, rsp2.getEventsSize());
-    }
-
-    /**
-     * Test makes sure that if you use the API {@link HiveMetaStoreClient#getNextNotification(NotificationEventRequest, boolean, NotificationFilter)}
-     * does not error out if the events are cleanedup.
-     */
-    @Test
-    public void skipCleanedUpEvents() throws Exception {
-        Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
-        msClient.createDatabase(db);
-        msClient.dropDatabase("cleanup1");
-
-        // sleep for expiry time, and then fetch again
-        // sleep twice the TTL interval - things should have been cleaned by then.
-        Thread.sleep(EVENTS_TTL * 2 * 1000);
-
-        db = new Database("cleanup2", "no description", testTempDir, emptyParameters);
-        msClient.createDatabase(db);
-        msClient.dropDatabase("cleanup2");
-
-        // the firstEventId is before the cleanup happened, so we should just receive the
-        // events which remaining after cleanup.
-        NotificationEventRequest request = new NotificationEventRequest();
-        request.setLastEvent(firstEventId);
-        request.setMaxEvents(-1);
-        NotificationEventResponse rsp2 = msClient.getNextNotification(request, true, null);
-        assertEquals(2, rsp2.getEventsSize());
-        // when we pass the allowGapsInEvents as false the API should error out
-        Exception ex = null;
-        try {
-            NotificationEventResponse rsp = msClient.getNextNotification(request, false, null);
-        } catch (Exception e) {
-            ex = e;
-        }
-        assertNotNull(ex);
-    }
-
-    @Test
-    public void cleanupNotificationWithError() throws Exception {
-        Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
-        msClient.createDatabase(db);
-        msClient.dropDatabase("cleanup1");
-
-        LOG.info("Pulling events immediately after createDatabase/dropDatabase");
-        NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-        assertEquals(2, rsp.getEventsSize());
-        //this simulates that cleaning thread will error out while cleaning the notifications
-        DummyRawStoreFailEvent.setEventSucceed(false);
-        // sleep for expiry time, and then fetch again
-        // sleep twice the TTL interval - things should have been cleaned by then.
-        Thread.sleep(EVENTS_TTL * 2 * 1000);
-
-        LOG.info("Pulling events again after failing to cleanup");
-        NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null);
-        LOG.info("second trigger done");
-        assertEquals(2, rsp2.getEventsSize());
-        DummyRawStoreFailEvent.setEventSucceed(true);
-        Thread.sleep(EVENTS_TTL * 2 * 1000);
-
-        LOG.info("Pulling events again after cleanup");
-        rsp2 = msClient.getNextNotification(firstEventId, 0, null);
-        LOG.info("third trigger done");
-        assertEquals(0, rsp2.getEventsSize());
-    }
-}
-
-
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 073fdc877f6..100ee24e1fa 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -19,6 +19,7 @@
 package org.apache.hive.hcatalog.listener;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
@@ -28,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.FunctionType;
 import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -63,11 +67,12 @@ import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
-import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.BatchAcidWriteEvent;
 import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
@@ -77,6 +82,9 @@ import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
 import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
+import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
+import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
 import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
@@ -102,7 +110,6 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
 import org.apache.hive.hcatalog.data.Pair;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -111,12 +118,13 @@ import org.junit.Test;
 import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.junit.Ignore;
 
 /**
  * Tests DbNotificationListener when used as a transactional event listener
  * (hive.metastore.transactional.event.listeners)
  */
+@org.junit.Ignore("TestDbNotificationListener is unstable HIVE-23680")
 public class TestDbNotificationListener {
   private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class
       .getName());
@@ -203,11 +211,6 @@ public class TestDbNotificationListener {
       pushEventId(EventType.ALTER_TABLE, tableEvent);
     }
 
-    @Override
-    public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException {
-      pushEventId(EventType.ALTER_DATABASE, dbEvent);
-    }
-
     @Override
     public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException {
       pushEventId(EventType.ADD_PARTITION, partitionEvent);
@@ -248,6 +251,18 @@ public class TestDbNotificationListener {
       pushEventId(EventType.INSERT, insertEvent);
     }
 
+    public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException {
+      pushEventId(EventType.OPEN_TXN, openTxnEvent);
+    }
+
+    public void onCommitTxn(CommitTxnEvent commitTxnEvent) throws MetaException {
+      pushEventId(EventType.COMMIT_TXN, commitTxnEvent);
+    }
+
+    public void onAbortTxn(AbortTxnEvent abortTxnEvent) throws MetaException {
+      pushEventId(EventType.ABORT_TXN, abortTxnEvent);
+    }
+
     public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent) throws MetaException {
       pushEventId(EventType.ALLOC_WRITE_ID, allocWriteIdEvent);
     }
@@ -297,18 +312,6 @@ public class TestDbNotificationListener {
     DummyRawStoreFailEvent.setEventSucceed(true);
   }
 
-  @AfterClass
-  public static void tearDownAfterClass() {
-
-    if (msClient != null) {
-      msClient.close();
-    }
-    if (driver != null) {
-      driver.close();
-    }
-
-  }
-
   @After
   public void tearDown() {
     MockMetaStoreEventListener.clearEvents();
@@ -389,8 +392,6 @@ public class TestDbNotificationListener {
     String newDesc = "test database";
     Database dbAfter = dbBefore.deepCopy();
     dbAfter.setDescription(newDesc);
-    dbAfter.setOwnerName("test2");
-    dbAfter.setOwnerType(PrincipalType.USER);
     msClient.alterDatabase(dbName, dbAfter);
     dbAfter = msClient.getDatabase(dbName);
 
@@ -1070,6 +1071,88 @@ public class TestDbNotificationListener {
     testEventCounts(defaultDbName, firstEventId, null, null, 3);
   }
 
+  @Test
+  public void openTxn() throws Exception {
+    msClient.openTxn("me", TxnType.READ_ONLY);
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(0, rsp.getEventsSize());
+
+    msClient.openTxn("me", TxnType.DEFAULT);
+    rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(0);
+    assertEquals(firstEventId + 1, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(EventType.OPEN_TXN.toString(), event.getEventType());
+  }
+
+  @Test
+  public void abortTxn() throws Exception {
+    long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+    long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    msClient.abortTxns(Collections.singletonList(txnId1));
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(0, rsp.getEventsSize());
+
+    msClient.abortTxns(Collections.singletonList(txnId2));
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(0);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
+  }
+
+  @Test
+  public void rollbackTxn() throws Exception {
+    long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+    long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    msClient.rollbackTxn(txnId1);
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(0, rsp.getEventsSize());
+
+    msClient.rollbackTxn(txnId2);
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(0);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
+  }
+
+  @Test
+  public void commitTxn() throws Exception {
+    long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+    long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    msClient.commitTxn(txnId1);
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(0, rsp.getEventsSize());
+
+    msClient.commitTxn(txnId2);
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(0);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType());
+  }
+
   @Test
   public void insertTable() throws Exception {
     String defaultDbName = "default";
@@ -1337,6 +1420,7 @@ public class TestDbNotificationListener {
   }
 
   @Test
+  @Ignore("HIVE-23401")
   public void sqlInsertTable() throws Exception {
     String defaultDbName = "default";
     String tblName = "sqlins";
@@ -1448,7 +1532,7 @@ public class TestDbNotificationListener {
     // Event 5, 6, 7
     driver.run("insert into table " + tblName + " partition (ds = 'today') values (2)");
     // Event 8, 9, 10
-    driver.run("insert into table " + tblName + " partition (ds = 'today') values (3)");
+    driver.run("insert into table " + tblName + " partition (ds) values (3, 'today')");
     // Event 9, 10
     driver.run("alter table " + tblName + " add partition (ds = 'yesterday')");
 
@@ -1461,9 +1545,9 @@ public class TestDbNotificationListener {
     // Event 10, 11, 12
     driver.run("insert into table " + tblName + " partition (ds = 'yesterday') values (2)");
     // Event 12, 13, 14
-    driver.run("insert into table " + tblName + " partition (ds = 'yesterday') values (3)");
+    driver.run("insert into table " + tblName + " partition (ds) values (3, 'yesterday')");
     // Event 15, 16, 17
-    driver.run("insert into table " + tblName + " partition (ds = 'tomorrow') values (2)");
+    driver.run("insert into table " + tblName + " partition (ds) values (3, 'tomorrow')");
     // Event 18
     driver.run("alter table " + tblName + " drop partition (ds = 'tomorrow')");
     // Event 19, 20, 21
@@ -1580,5 +1664,86 @@ public class TestDbNotificationListener {
     assertTrue(files.hasNext());
   }
 
+  @Test
+  public void cleanupNotifs() throws Exception {
+    Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
+    msClient.createDatabase(db);
+    msClient.dropDatabase("cleanup1");
 
+    LOG.info("Pulling events immediately after createDatabase/dropDatabase");
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(2, rsp.getEventsSize());
+
+    // sleep for expiry time, and then fetch again
+    // sleep twice the TTL interval - things should have been cleaned by then.
+    Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+    LOG.info("Pulling events again after cleanup");
+    NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null);
+    LOG.info("second trigger done");
+    assertEquals(0, rsp2.getEventsSize());
+  }
+
+  /**
+   * Test makes sure that if you use the API {@link HiveMetaStoreClient#getNextNotification(NotificationEventRequest, boolean, NotificationFilter)}
+   * does not error out if the events are cleanedup.
+   */
+  @Test
+  public void skipCleanedUpEvents() throws Exception {
+    Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
+    msClient.createDatabase(db);
+    msClient.dropDatabase("cleanup1");
+
+    // sleep for expiry time, and then fetch again
+    // sleep twice the TTL interval - things should have been cleaned by then.
+    Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+    db = new Database("cleanup2", "no description", testTempDir, emptyParameters);
+    msClient.createDatabase(db);
+    msClient.dropDatabase("cleanup2");
+
+    // the firstEventId is before the cleanup happened, so we should just receive the
+    // events which remaining after cleanup.
+    NotificationEventRequest request = new NotificationEventRequest();
+    request.setLastEvent(firstEventId);
+    request.setMaxEvents(-1);
+    NotificationEventResponse rsp2 = msClient.getNextNotification(request, true, null);
+    assertEquals(2, rsp2.getEventsSize());
+    // when we pass the allowGapsInEvents as false the API should error out
+    Exception ex = null;
+    try {
+      NotificationEventResponse rsp = msClient.getNextNotification(request, false, null);
+    } catch (Exception e) {
+      ex = e;
+    }
+    assertNotNull(ex);
+  }
+
+  @Test
+  public void cleanupNotificationWithError() throws Exception {
+    Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
+    msClient.createDatabase(db);
+    msClient.dropDatabase("cleanup1");
+
+    LOG.info("Pulling events immediately after createDatabase/dropDatabase");
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(2, rsp.getEventsSize());
+    //this simulates that cleaning thread will error out while cleaning the notifications
+    DummyRawStoreFailEvent.setEventSucceed(false);
+    // sleep for expiry time, and then fetch again
+    // sleep twice the TTL interval - things should have been cleaned by then.
+    Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+    LOG.info("Pulling events again after failing to cleanup");
+    NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null);
+    LOG.info("second trigger done");
+    assertEquals(2, rsp2.getEventsSize());
+    DummyRawStoreFailEvent.setEventSucceed(true);
+    Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+    LOG.info("Pulling events again after cleanup");
+    rsp2 = msClient.getNextNotification(firstEventId, 0, null);
+    LOG.info("third trigger done");
+    assertEquals(0, rsp2.getEventsSize());
+  }
 }
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
deleted file mode 100644
index 3b9853684a4..00000000000
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.listener;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import java.util.Collections;
-import org.apache.hadoop.hive.cli.CliSessionState;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.hadoop.hive.metastore.api.TxnType;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
-import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
-import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
-import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-
-
-public class TestTransactionalDbNotificationListener {
-    private static IMetaStoreClient msClient;
-    private static IDriver driver;
-
-    private int startTime;
-    private long firstEventId;
-
-
-    @SuppressWarnings("rawtypes")
-    @BeforeClass
-    public static void connectToMetastore() throws Exception {
-        HiveConf conf = new HiveConf();
-        conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
-                "org.apache.hive.hcatalog.listener.DbNotificationListener");
-        conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-        conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
-        conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, DummyRawStoreFailEvent.class.getName());
-        MetastoreConf.setVar(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY, JSONMessageEncoder.class.getName());
-        conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-                "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
-        SessionState.start(new CliSessionState(conf));
-        TestTxnDbUtil.setConfValues(conf);
-        TestTxnDbUtil.prepDb(conf);
-        msClient = new HiveMetaStoreClient(conf);
-        driver = DriverFactory.newDriver(conf);
-
-    }
-
-    @Before
-    public void setup() throws Exception {
-        long now = System.currentTimeMillis() / 1000;
-        startTime = 0;
-        if (now > Integer.MAX_VALUE) {
-            fail("Bummer, time has fallen over the edge");
-        } else {
-            startTime = (int) now;
-        }
-        firstEventId = msClient.getCurrentNotificationEventId().getEventId();
-        DummyRawStoreFailEvent.setEventSucceed(true);
-    }
-
-    @Test
-    public void openTxn() throws Exception {
-        msClient.openTxn("me", TxnType.READ_ONLY);
-        NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-        assertEquals(0, rsp.getEventsSize());
-
-        msClient.openTxn("me", TxnType.DEFAULT);
-        rsp = msClient.getNextNotification(firstEventId, 0, null);
-        assertEquals(1, rsp.getEventsSize());
-
-        NotificationEvent event = rsp.getEvents().get(0);
-        assertEquals(firstEventId + 1, event.getEventId());
-        assertTrue(event.getEventTime() >= startTime);
-        assertEquals(EventType.OPEN_TXN.toString(), event.getEventType());
-    }
-
-    @Test
-    public void abortTxn() throws Exception {
-
-        long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
-        long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
-
-        NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-        assertEquals(1, rsp.getEventsSize());
-
-        msClient.abortTxns(Collections.singletonList(txnId1));
-        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-        assertEquals(0, rsp.getEventsSize());
-
-        msClient.abortTxns(Collections.singletonList(txnId2));
-        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-        assertEquals(1, rsp.getEventsSize());
-
-        NotificationEvent event = rsp.getEvents().get(0);
-        assertEquals(firstEventId + 2, event.getEventId());
-        assertTrue(event.getEventTime() >= startTime);
-        assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
-    }
-
-    @Test
-    public void rollbackTxn() throws Exception {
-        long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
-        long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
-
-        NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-        assertEquals(1, rsp.getEventsSize());
-
-        msClient.rollbackTxn(txnId1);
-        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-        assertEquals(0, rsp.getEventsSize());
-
-        msClient.rollbackTxn(txnId2);
-        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-        assertEquals(1, rsp.getEventsSize());
-
-        NotificationEvent event = rsp.getEvents().get(0);
-        assertEquals(firstEventId + 2, event.getEventId());
-        assertTrue(event.getEventTime() >= startTime);
-        assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
-    }
-
-    @Test
-    public void commitTxn() throws Exception {
-        long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
-        long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
-
-        NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-        assertEquals(1, rsp.getEventsSize());
-
-        msClient.commitTxn(txnId1);
-        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-        assertEquals(0, rsp.getEventsSize());
-
-        msClient.commitTxn(txnId2);
-        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-        assertEquals(1, rsp.getEventsSize());
-
-        NotificationEvent event = rsp.getEvents().get(0);
-        assertEquals(firstEventId + 2, event.getEventId());
-        assertTrue(event.getEventTime() >= startTime);
-        assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType());
-    }
-
-}
\ No newline at end of file