You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2023/08/29 06:00:29 UTC
[hive] branch master updated: Revert "HIVE-23680 : TestDbNotificationListener is unstable (Kirti Ruge, reviewed by Zsolt Miskolczi, Laszlo Vegh)"
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new bfd4362ada8 Revert "HIVE-23680 : TestDbNotificationListener is unstable (Kirti Ruge, reviewed by Zsolt Miskolczi, Laszlo Vegh)"
bfd4362ada8 is described below
commit bfd4362ada8ce7cf7bcbb3e77d7fbcc54d2308c9
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Tue Aug 29 11:20:20 2023 +0530
Revert "HIVE-23680 : TestDbNotificationListener is unstable (Kirti Ruge, reviewed by Zsolt Miskolczi, Laszlo Vegh)"
This reverts commit a02fe662ba1548dc5a80041d0649e131ca1be789.
---
itests/hcatalog-unit/pom.xml | 1 -
.../listener/TestDbNotificationCleanup.java | 189 ------------------
.../listener/TestDbNotificationListener.java | 215 ++++++++++++++++++---
.../TestTransactionalDbNotificationListener.java | 169 ----------------
4 files changed, 190 insertions(+), 384 deletions(-)
diff --git a/itests/hcatalog-unit/pom.xml b/itests/hcatalog-unit/pom.xml
index 4a049697b71..5183a2ba802 100644
--- a/itests/hcatalog-unit/pom.xml
+++ b/itests/hcatalog-unit/pom.xml
@@ -89,7 +89,6 @@
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-standalone-metastore-server</artifactId>
- <classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationCleanup.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationCleanup.java
deleted file mode 100644
index e73a57ea58f..00000000000
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationCleanup.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.listener;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import java.util.concurrent.TimeUnit;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.hive.cli.CliSessionState;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
-import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
-import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
-import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.AfterClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL;
-
-
-
-public class TestDbNotificationCleanup {
- private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationCleanup.class
- .getName());
- private static final int EVENTS_TTL = 30;
- private static final int CLEANUP_SLEEP_TIME = 10;
- private static Map<String, String> emptyParameters = new HashMap<String, String>();
- private static IMetaStoreClient msClient;
- private static IDriver driver;
- private static MessageDeserializer md;
- private static HiveConf conf;
-
- private long firstEventId;
- private final String testTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "testDbNotif").toString();
- @SuppressWarnings("rawtypes")
- @BeforeClass
- public static void connectToMetastore() throws Exception {
- conf = new HiveConf();
-
- MetastoreConf.setVar(conf,MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS,
- "org.apache.hive.hcatalog.listener.DbNotificationListener");
- conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
- conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, DummyRawStoreFailEvent.class.getName());
- MetastoreConf.setVar(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY, JSONMessageEncoder.class.getName());
- MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, CLEANUP_SLEEP_TIME, TimeUnit.SECONDS);
- MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, EVENTS_TTL, TimeUnit.SECONDS);
- MetastoreConf.setTimeVar(conf, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, 20, TimeUnit.SECONDS);
- conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
- "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
- SessionState.start(new CliSessionState(conf));
- msClient = new HiveMetaStoreClient(conf);
- driver = DriverFactory.newDriver(conf);
-
- }
-
- @Before
- public void setup() throws Exception {
- firstEventId = msClient.getCurrentNotificationEventId().getEventId();
- DummyRawStoreFailEvent.setEventSucceed(true);
- }
-
- @AfterClass
- public static void tearDownAfterClass() {
-
- if (msClient != null) {
- msClient.close();
- }
- if (driver != null) {
- driver.close();
- }
- conf = null;
- }
-
-
- @Test
- public void cleanupNotifs() throws Exception {
- Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
- msClient.createDatabase(db);
- msClient.dropDatabase("cleanup1");
-
- LOG.info("Pulling events immediately after createDatabase/dropDatabase");
- NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
- assertEquals(2, rsp.getEventsSize());
-
- // sleep for expiry time, and then fetch again
- // sleep twice the TTL interval - things should have been cleaned by then.
- Thread.sleep(EVENTS_TTL * 2 * 1000);
-
- LOG.info("Pulling events again after cleanup");
- NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null);
- LOG.info("second trigger done");
- assertEquals(0, rsp2.getEventsSize());
- }
-
- /**
- * Test makes sure that if you use the API {@link HiveMetaStoreClient#getNextNotification(NotificationEventRequest, boolean, NotificationFilter)}
- * does not error out if the events are cleanedup.
- */
- @Test
- public void skipCleanedUpEvents() throws Exception {
- Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
- msClient.createDatabase(db);
- msClient.dropDatabase("cleanup1");
-
- // sleep for expiry time, and then fetch again
- // sleep twice the TTL interval - things should have been cleaned by then.
- Thread.sleep(EVENTS_TTL * 2 * 1000);
-
- db = new Database("cleanup2", "no description", testTempDir, emptyParameters);
- msClient.createDatabase(db);
- msClient.dropDatabase("cleanup2");
-
- // the firstEventId is before the cleanup happened, so we should just receive the
- // events which remaining after cleanup.
- NotificationEventRequest request = new NotificationEventRequest();
- request.setLastEvent(firstEventId);
- request.setMaxEvents(-1);
- NotificationEventResponse rsp2 = msClient.getNextNotification(request, true, null);
- assertEquals(2, rsp2.getEventsSize());
- // when we pass the allowGapsInEvents as false the API should error out
- Exception ex = null;
- try {
- NotificationEventResponse rsp = msClient.getNextNotification(request, false, null);
- } catch (Exception e) {
- ex = e;
- }
- assertNotNull(ex);
- }
-
- @Test
- public void cleanupNotificationWithError() throws Exception {
- Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
- msClient.createDatabase(db);
- msClient.dropDatabase("cleanup1");
-
- LOG.info("Pulling events immediately after createDatabase/dropDatabase");
- NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
- assertEquals(2, rsp.getEventsSize());
- //this simulates that cleaning thread will error out while cleaning the notifications
- DummyRawStoreFailEvent.setEventSucceed(false);
- // sleep for expiry time, and then fetch again
- // sleep twice the TTL interval - things should have been cleaned by then.
- Thread.sleep(EVENTS_TTL * 2 * 1000);
-
- LOG.info("Pulling events again after failing to cleanup");
- NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null);
- LOG.info("second trigger done");
- assertEquals(2, rsp2.getEventsSize());
- DummyRawStoreFailEvent.setEventSucceed(true);
- Thread.sleep(EVENTS_TTL * 2 * 1000);
-
- LOG.info("Pulling events again after cleanup");
- rsp2 = msClient.getNextNotification(firstEventId, 0, null);
- LOG.info("third trigger done");
- assertEquals(0, rsp2.getEventsSize());
- }
-}
-
-
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 073fdc877f6..100ee24e1fa 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -19,6 +19,7 @@
package org.apache.hive.hcatalog.listener;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
@@ -28,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants;
import org.apache.hadoop.hive.metastore.TableType;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.FunctionType;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
import org.apache.hadoop.hive.metastore.api.Partition;
@@ -63,11 +67,12 @@ import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
-import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.BatchAcidWriteEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
@@ -77,6 +82,9 @@ import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
+import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
+import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
@@ -102,7 +110,6 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
import org.apache.hive.hcatalog.data.Pair;
import org.junit.After;
-import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -111,12 +118,13 @@ import org.junit.Test;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import org.junit.Ignore;
/**
* Tests DbNotificationListener when used as a transactional event listener
* (hive.metastore.transactional.event.listeners)
*/
+@org.junit.Ignore("TestDbNotificationListener is unstable HIVE-23680")
public class TestDbNotificationListener {
private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class
.getName());
@@ -203,11 +211,6 @@ public class TestDbNotificationListener {
pushEventId(EventType.ALTER_TABLE, tableEvent);
}
- @Override
- public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException {
- pushEventId(EventType.ALTER_DATABASE, dbEvent);
- }
-
@Override
public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException {
pushEventId(EventType.ADD_PARTITION, partitionEvent);
@@ -248,6 +251,18 @@ public class TestDbNotificationListener {
pushEventId(EventType.INSERT, insertEvent);
}
+ public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException {
+ pushEventId(EventType.OPEN_TXN, openTxnEvent);
+ }
+
+ public void onCommitTxn(CommitTxnEvent commitTxnEvent) throws MetaException {
+ pushEventId(EventType.COMMIT_TXN, commitTxnEvent);
+ }
+
+ public void onAbortTxn(AbortTxnEvent abortTxnEvent) throws MetaException {
+ pushEventId(EventType.ABORT_TXN, abortTxnEvent);
+ }
+
public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent) throws MetaException {
pushEventId(EventType.ALLOC_WRITE_ID, allocWriteIdEvent);
}
@@ -297,18 +312,6 @@ public class TestDbNotificationListener {
DummyRawStoreFailEvent.setEventSucceed(true);
}
- @AfterClass
- public static void tearDownAfterClass() {
-
- if (msClient != null) {
- msClient.close();
- }
- if (driver != null) {
- driver.close();
- }
-
- }
-
@After
public void tearDown() {
MockMetaStoreEventListener.clearEvents();
@@ -389,8 +392,6 @@ public class TestDbNotificationListener {
String newDesc = "test database";
Database dbAfter = dbBefore.deepCopy();
dbAfter.setDescription(newDesc);
- dbAfter.setOwnerName("test2");
- dbAfter.setOwnerType(PrincipalType.USER);
msClient.alterDatabase(dbName, dbAfter);
dbAfter = msClient.getDatabase(dbName);
@@ -1070,6 +1071,88 @@ public class TestDbNotificationListener {
testEventCounts(defaultDbName, firstEventId, null, null, 3);
}
+ @Test
+ public void openTxn() throws Exception {
+ msClient.openTxn("me", TxnType.READ_ONLY);
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(0, rsp.getEventsSize());
+
+ msClient.openTxn("me", TxnType.DEFAULT);
+ rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(0);
+ assertEquals(firstEventId + 1, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(EventType.OPEN_TXN.toString(), event.getEventType());
+ }
+
+ @Test
+ public void abortTxn() throws Exception {
+ long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+ long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ msClient.abortTxns(Collections.singletonList(txnId1));
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(0, rsp.getEventsSize());
+
+ msClient.abortTxns(Collections.singletonList(txnId2));
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(0);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
+ }
+
+ @Test
+ public void rollbackTxn() throws Exception {
+ long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+ long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ msClient.rollbackTxn(txnId1);
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(0, rsp.getEventsSize());
+
+ msClient.rollbackTxn(txnId2);
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(0);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
+ }
+
+ @Test
+ public void commitTxn() throws Exception {
+ long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+ long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ msClient.commitTxn(txnId1);
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(0, rsp.getEventsSize());
+
+ msClient.commitTxn(txnId2);
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(0);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType());
+ }
+
@Test
public void insertTable() throws Exception {
String defaultDbName = "default";
@@ -1337,6 +1420,7 @@ public class TestDbNotificationListener {
}
@Test
+ @Ignore("HIVE-23401")
public void sqlInsertTable() throws Exception {
String defaultDbName = "default";
String tblName = "sqlins";
@@ -1448,7 +1532,7 @@ public class TestDbNotificationListener {
// Event 5, 6, 7
driver.run("insert into table " + tblName + " partition (ds = 'today') values (2)");
// Event 8, 9, 10
- driver.run("insert into table " + tblName + " partition (ds = 'today') values (3)");
+ driver.run("insert into table " + tblName + " partition (ds) values (3, 'today')");
// Event 9, 10
driver.run("alter table " + tblName + " add partition (ds = 'yesterday')");
@@ -1461,9 +1545,9 @@ public class TestDbNotificationListener {
// Event 10, 11, 12
driver.run("insert into table " + tblName + " partition (ds = 'yesterday') values (2)");
// Event 12, 13, 14
- driver.run("insert into table " + tblName + " partition (ds = 'yesterday') values (3)");
+ driver.run("insert into table " + tblName + " partition (ds) values (3, 'yesterday')");
// Event 15, 16, 17
- driver.run("insert into table " + tblName + " partition (ds = 'tomorrow') values (2)");
+ driver.run("insert into table " + tblName + " partition (ds) values (3, 'tomorrow')");
// Event 18
driver.run("alter table " + tblName + " drop partition (ds = 'tomorrow')");
// Event 19, 20, 21
@@ -1580,5 +1664,86 @@ public class TestDbNotificationListener {
assertTrue(files.hasNext());
}
+ @Test
+ public void cleanupNotifs() throws Exception {
+ Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
+ msClient.createDatabase(db);
+ msClient.dropDatabase("cleanup1");
+ LOG.info("Pulling events immediately after createDatabase/dropDatabase");
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(2, rsp.getEventsSize());
+
+ // sleep for expiry time, and then fetch again
+ // sleep twice the TTL interval - things should have been cleaned by then.
+ Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+ LOG.info("Pulling events again after cleanup");
+ NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null);
+ LOG.info("second trigger done");
+ assertEquals(0, rsp2.getEventsSize());
+ }
+
+ /**
+ * Test makes sure that if you use the API {@link HiveMetaStoreClient#getNextNotification(NotificationEventRequest, boolean, NotificationFilter)}
+ * does not error out if the events are cleanedup.
+ */
+ @Test
+ public void skipCleanedUpEvents() throws Exception {
+ Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
+ msClient.createDatabase(db);
+ msClient.dropDatabase("cleanup1");
+
+ // sleep for expiry time, and then fetch again
+ // sleep twice the TTL interval - things should have been cleaned by then.
+ Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+ db = new Database("cleanup2", "no description", testTempDir, emptyParameters);
+ msClient.createDatabase(db);
+ msClient.dropDatabase("cleanup2");
+
+ // the firstEventId is before the cleanup happened, so we should just receive the
+ // events which remaining after cleanup.
+ NotificationEventRequest request = new NotificationEventRequest();
+ request.setLastEvent(firstEventId);
+ request.setMaxEvents(-1);
+ NotificationEventResponse rsp2 = msClient.getNextNotification(request, true, null);
+ assertEquals(2, rsp2.getEventsSize());
+ // when we pass the allowGapsInEvents as false the API should error out
+ Exception ex = null;
+ try {
+ NotificationEventResponse rsp = msClient.getNextNotification(request, false, null);
+ } catch (Exception e) {
+ ex = e;
+ }
+ assertNotNull(ex);
+ }
+
+ @Test
+ public void cleanupNotificationWithError() throws Exception {
+ Database db = new Database("cleanup1", "no description", testTempDir, emptyParameters);
+ msClient.createDatabase(db);
+ msClient.dropDatabase("cleanup1");
+
+ LOG.info("Pulling events immediately after createDatabase/dropDatabase");
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(2, rsp.getEventsSize());
+ //this simulates that cleaning thread will error out while cleaning the notifications
+ DummyRawStoreFailEvent.setEventSucceed(false);
+ // sleep for expiry time, and then fetch again
+ // sleep twice the TTL interval - things should have been cleaned by then.
+ Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+ LOG.info("Pulling events again after failing to cleanup");
+ NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null);
+ LOG.info("second trigger done");
+ assertEquals(2, rsp2.getEventsSize());
+ DummyRawStoreFailEvent.setEventSucceed(true);
+ Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+ LOG.info("Pulling events again after cleanup");
+ rsp2 = msClient.getNextNotification(firstEventId, 0, null);
+ LOG.info("third trigger done");
+ assertEquals(0, rsp2.getEventsSize());
+ }
}
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
deleted file mode 100644
index 3b9853684a4..00000000000
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.listener;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import java.util.Collections;
-import org.apache.hadoop.hive.cli.CliSessionState;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.hadoop.hive.metastore.api.TxnType;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
-import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
-import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
-import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-
-
-public class TestTransactionalDbNotificationListener {
- private static IMetaStoreClient msClient;
- private static IDriver driver;
-
- private int startTime;
- private long firstEventId;
-
-
- @SuppressWarnings("rawtypes")
- @BeforeClass
- public static void connectToMetastore() throws Exception {
- HiveConf conf = new HiveConf();
- conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
- "org.apache.hive.hcatalog.listener.DbNotificationListener");
- conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
- conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, DummyRawStoreFailEvent.class.getName());
- MetastoreConf.setVar(conf, MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY, JSONMessageEncoder.class.getName());
- conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
- "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
- SessionState.start(new CliSessionState(conf));
- TestTxnDbUtil.setConfValues(conf);
- TestTxnDbUtil.prepDb(conf);
- msClient = new HiveMetaStoreClient(conf);
- driver = DriverFactory.newDriver(conf);
-
- }
-
- @Before
- public void setup() throws Exception {
- long now = System.currentTimeMillis() / 1000;
- startTime = 0;
- if (now > Integer.MAX_VALUE) {
- fail("Bummer, time has fallen over the edge");
- } else {
- startTime = (int) now;
- }
- firstEventId = msClient.getCurrentNotificationEventId().getEventId();
- DummyRawStoreFailEvent.setEventSucceed(true);
- }
-
- @Test
- public void openTxn() throws Exception {
- msClient.openTxn("me", TxnType.READ_ONLY);
- NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
- assertEquals(0, rsp.getEventsSize());
-
- msClient.openTxn("me", TxnType.DEFAULT);
- rsp = msClient.getNextNotification(firstEventId, 0, null);
- assertEquals(1, rsp.getEventsSize());
-
- NotificationEvent event = rsp.getEvents().get(0);
- assertEquals(firstEventId + 1, event.getEventId());
- assertTrue(event.getEventTime() >= startTime);
- assertEquals(EventType.OPEN_TXN.toString(), event.getEventType());
- }
-
- @Test
- public void abortTxn() throws Exception {
-
- long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
- long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
-
- NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
- assertEquals(1, rsp.getEventsSize());
-
- msClient.abortTxns(Collections.singletonList(txnId1));
- rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
- assertEquals(0, rsp.getEventsSize());
-
- msClient.abortTxns(Collections.singletonList(txnId2));
- rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
- assertEquals(1, rsp.getEventsSize());
-
- NotificationEvent event = rsp.getEvents().get(0);
- assertEquals(firstEventId + 2, event.getEventId());
- assertTrue(event.getEventTime() >= startTime);
- assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
- }
-
- @Test
- public void rollbackTxn() throws Exception {
- long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
- long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
-
- NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
- assertEquals(1, rsp.getEventsSize());
-
- msClient.rollbackTxn(txnId1);
- rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
- assertEquals(0, rsp.getEventsSize());
-
- msClient.rollbackTxn(txnId2);
- rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
- assertEquals(1, rsp.getEventsSize());
-
- NotificationEvent event = rsp.getEvents().get(0);
- assertEquals(firstEventId + 2, event.getEventId());
- assertTrue(event.getEventTime() >= startTime);
- assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
- }
-
- @Test
- public void commitTxn() throws Exception {
- long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
- long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
-
- NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
- assertEquals(1, rsp.getEventsSize());
-
- msClient.commitTxn(txnId1);
- rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
- assertEquals(0, rsp.getEventsSize());
-
- msClient.commitTxn(txnId2);
- rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
- assertEquals(1, rsp.getEventsSize());
-
- NotificationEvent event = rsp.getEvents().get(0);
- assertEquals(firstEventId + 2, event.getEventId());
- assertTrue(event.getEventTime() >= startTime);
- assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType());
- }
-
-}
\ No newline at end of file