You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2021/11/19 18:16:05 UTC
[hive] branch master updated: HIVE-25700: Prevent deletion of Notification Events post restarts. (Ayush Saxena, reviewed by Pravin Kumar Sinha)
This is an automated email from the ASF dual-hosted git repository.
pravin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e8ba31a HIVE-25700: Prevent deletion of Notification Events post restarts. (Ayush Saxena, reviewed by Pravin Kumar Sinha)
e8ba31a is described below
commit e8ba31a7dcc35cc5744423614ed7474876dc563a
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Fri Nov 19 23:45:03 2021 +0530
HIVE-25700: Prevent deletion of Notification Events post restarts. (Ayush Saxena, reviewed by Pravin Kumar Sinha)
---
.../hcatalog/listener/DbNotificationListener.java | 34 ++++++++++
.../ql/parse/TestReplWithJsonMessageFormat.java | 1 +
.../hive/ql/parse/TestReplicationScenarios.java | 72 ++++++++++++++++++++--
.../hadoop/hive/metastore/conf/MetastoreConf.java | 3 +
4 files changed, 106 insertions(+), 4 deletions(-)
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 4f442ce..7980d53 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -142,6 +142,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL;
/**
* An implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} that
@@ -254,6 +255,13 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
cleaner.setCleanupInterval(MetastoreConf.getTimeVar(getConf(),
MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, TimeUnit.MILLISECONDS));
}
+
+ if (key.equals(EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL.toString()) || key
+ .equals(EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL.getHiveName())) {
+ cleaner.setWaitInterval(MetastoreConf
+ .getTimeVar(getConf(), EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL,
+ TimeUnit.MILLISECONDS));
+ }
}
/**
@@ -1406,6 +1414,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
private final RawStore rs;
private int ttl;
private long sleepTime;
+ private long waitInterval;
+ private boolean isInTest;
CleanerThread(Configuration conf, RawStore rs) {
super("DB-Notification-Cleaner");
@@ -1413,14 +1423,34 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
this.rs = Objects.requireNonNull(rs);
boolean isReplEnabled = MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED);
+ isInTest = conf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, false);
ConfVars ttlConf = (isReplEnabled) ? ConfVars.REPL_EVENT_DB_LISTENER_TTL : ConfVars.EVENT_DB_LISTENER_TTL;
setTimeToLive(MetastoreConf.getTimeVar(conf, ttlConf, TimeUnit.SECONDS));
setCleanupInterval(
MetastoreConf.getTimeVar(conf, ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, TimeUnit.MILLISECONDS));
+ setWaitInterval(MetastoreConf
+ .getTimeVar(conf, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, TimeUnit.MILLISECONDS));
}
@Override
public void run() {
+ LOG.info("Wait interval is {}", waitInterval);
+ if (waitInterval > 0) {
+ try {
+ LOG.info("Cleaner Thread Restarted and {} or {} is configured. So cleaner thread will startup post waiting "
+ + "{} ms", EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL,
+ EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL.getHiveName(), waitInterval);
+ Thread.sleep(waitInterval);
+ } catch (InterruptedException e) {
+ LOG.error("Failed during the initial wait before start.", e);
+ if(isInTest) {
+ Thread.currentThread().interrupt();
+ }
+ return;
+ }
+ LOG.info("Completed Cleaner thread initial wait. Starting normal processing.");
+ }
+
while (true) {
LOG.debug("Cleaner thread running");
try {
@@ -1448,5 +1478,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
public void setCleanupInterval(long configInterval) {
sleepTime = configInterval;
}
+
+ public void setWaitInterval(long waitInterval) {
+ this.waitInterval = waitInterval;
+ }
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
index 19a56de..dc22be2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
@@ -36,6 +36,7 @@ public class TestReplWithJsonMessageFormat extends TestReplicationScenarios {
new ReplicationV1CompatRule(metaStoreClient, hconf, new ArrayList<String>() {{
add("testEventFilters");
add("testReplConfiguredCleanupOfNotificationEvents");
+ add("testCleanerThreadStartupWait");
}});
@BeforeClass
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 62f1dd9..61f61a8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -137,6 +137,8 @@ import org.apache.logging.log4j.LogManager;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER;
@@ -252,6 +254,8 @@ public class TestReplicationScenarios {
metaStoreClientMirror = new HiveMetaStoreClient(hconfMirror);
PersistenceManagerProvider.setTwoMetastoreTesting(true);
+ MetastoreConf.setTimeVar(hconf, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, 0, TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(hconfMirror, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, 0, TimeUnit.SECONDS);
}
@AfterClass
@@ -2437,7 +2441,7 @@ public class TestReplicationScenarios {
// For next run, CM is enabled, set REPL_EVENT_DB_LISTENER_TTL to low value for events to get deleted
MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 * 60, TimeUnit.SECONDS);
- MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds , TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds , TimeUnit.SECONDS);
DbNotificationListener.resetCleaner(hconf);
run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver);
@@ -2465,7 +2469,7 @@ public class TestReplicationScenarios {
// First check with high ttl
MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, false);
MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 * 60, TimeUnit.SECONDS);
- MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds, TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds, TimeUnit.SECONDS);
DbNotificationListener.resetCleaner(hconf);
run("CREATE TABLE " + dbName
@@ -2491,7 +2495,7 @@ public class TestReplicationScenarios {
//With CM disabled, set a low ttl for events to get deleted
MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, false);
MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds, TimeUnit.SECONDS);
- MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 * 60, TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 * 60, TimeUnit.SECONDS);
DbNotificationListener.resetCleaner(hconf);
run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName
@@ -2514,13 +2518,73 @@ public class TestReplicationScenarios {
//restore original values
MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, true);
MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, 86400, TimeUnit.SECONDS);
- MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, 864000, TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, 864000, TimeUnit.SECONDS);
MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, 7200, TimeUnit.SECONDS);
DbNotificationListener.resetCleaner(hconf);
verifySetupSteps = verifySetupOriginal;
}
@Test
+ public void testCleanerThreadStartupWait() throws Exception {
+ int eventsTtl = 20;
+ HiveConf newConf = new HiveConf(hconf);
+
+ // Set TTL short enough for testing.
+ MetastoreConf.setTimeVar(newConf, REPL_EVENT_DB_LISTENER_TTL, eventsTtl, TimeUnit.SECONDS);
+
+ // Set startup wait interval.
+ MetastoreConf.setTimeVar(newConf, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, eventsTtl * 5, TimeUnit.SECONDS);
+
+ // Set cleaner wait interval.
+ MetastoreConf
+ .setTimeVar(newConf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, 10, TimeUnit.MILLISECONDS);
+ newConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL, true);
+ // Reset Cleaner to have a initial wait time.
+ DbNotificationListener.resetCleaner(newConf);
+
+ IMetaStoreClient msClient = metaStoreClient;
+ run("create database referenceDb", driver);
+
+ long firstEventId = msClient.getCurrentNotificationEventId().getEventId();;
+
+ run("create database cleanupStartup", driver);
+ run("drop database cleanupStartup", driver);
+
+ LOG.info("Done with creating events.");
+
+ // Check events are pushed into notification logs.
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(2, rsp.getEventsSize());
+
+ // Reset Cleaner to have a initial wait time.
+ DbNotificationListener.resetCleaner(newConf);
+
+ // Sleep for eventsTtl time and see if events are there.
+ Thread.sleep(eventsTtl * 1000);
+
+ // Check events are there in notification logs.
+ rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(2, rsp.getEventsSize());
+
+ // Sleep for some more time and see if events are there.
+ Thread.sleep(eventsTtl * 1000);
+
+ rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(2, rsp.getEventsSize());
+
+ // Sleep more than the initial wait time and see if Events get cleaned up post that
+ Thread.sleep(eventsTtl * 4000);
+
+ // Events should have cleaned up.
+ rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(0, rsp.getEventsSize());
+
+ // Reset with original configuration.
+ DbNotificationListener.resetCleaner(hconf);
+ run("drop database referenceDb", driver);
+ }
+
+ @Test
public void testIncrementalInsertToPartition() throws IOException {
String testName = "incrementalInsertToPartition";
String dbName = createDB(testName, driver);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index e42493b..1738ab4 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -752,6 +752,9 @@ public class MetastoreConf {
EVENT_DB_LISTENER_CLEAN_INTERVAL("metastore.event.db.listener.clean.interval",
"hive.metastore.event.db.listener.clean.interval", 7200, TimeUnit.SECONDS,
"sleep interval between each run for cleanup of events from the database listener queue"),
+ EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL("metastore.event.db.listener.clean.startup.wait.interval",
+ "hive.metastore.event.db.listener.clean.startup.wait.interval", 1, TimeUnit.DAYS,
+ "Wait interval post start of metastore after which the cleaner thread starts to work"),
EVENT_DB_NOTIFICATION_API_AUTH("metastore.metastore.event.db.notification.api.auth",
"hive.metastore.event.db.notification.api.auth", true,
"Should metastore do authorization against database notification related APIs such as get_next_notification.\n" +