You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2021/11/19 18:16:05 UTC

[hive] branch master updated: HIVE-25700: Prevent deletion of Notification Events post restarts. (Ayush Saxena, reviewed by Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

pravin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e8ba31a  HIVE-25700: Prevent deletion of Notification Events post restarts. (Ayush Saxena, reviewed by Pravin Kumar Sinha)
e8ba31a is described below

commit e8ba31a7dcc35cc5744423614ed7474876dc563a
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Fri Nov 19 23:45:03 2021 +0530

    HIVE-25700: Prevent deletion of Notification Events post restarts. (Ayush Saxena, reviewed by Pravin Kumar Sinha)
---
 .../hcatalog/listener/DbNotificationListener.java  | 34 ++++++++++
 .../ql/parse/TestReplWithJsonMessageFormat.java    |  1 +
 .../hive/ql/parse/TestReplicationScenarios.java    | 72 ++++++++++++++++++++--
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |  3 +
 4 files changed, 106 insertions(+), 4 deletions(-)

diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 4f442ce..7980d53 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -142,6 +142,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL;
 
 /**
  * An implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} that
@@ -254,6 +255,13 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
       cleaner.setCleanupInterval(MetastoreConf.getTimeVar(getConf(),
               MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, TimeUnit.MILLISECONDS));
     }
+
+    if (key.equals(EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL.toString()) || key
+        .equals(EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL.getHiveName())) {
+      cleaner.setWaitInterval(MetastoreConf
+          .getTimeVar(getConf(), EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL,
+              TimeUnit.MILLISECONDS));
+    }
   }
 
   /**
@@ -1406,6 +1414,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     private final RawStore rs;
     private int ttl;
     private long sleepTime;
+    private long waitInterval;
+    private boolean isInTest;
 
     CleanerThread(Configuration conf, RawStore rs) {
       super("DB-Notification-Cleaner");
@@ -1413,14 +1423,34 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
       this.rs = Objects.requireNonNull(rs);
 
       boolean isReplEnabled = MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED);
+      isInTest = conf.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, false);
       ConfVars ttlConf = (isReplEnabled) ?  ConfVars.REPL_EVENT_DB_LISTENER_TTL : ConfVars.EVENT_DB_LISTENER_TTL;
       setTimeToLive(MetastoreConf.getTimeVar(conf, ttlConf, TimeUnit.SECONDS));
       setCleanupInterval(
           MetastoreConf.getTimeVar(conf, ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, TimeUnit.MILLISECONDS));
+      setWaitInterval(MetastoreConf
+          .getTimeVar(conf, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, TimeUnit.MILLISECONDS));
     }
 
     @Override
     public void run() {
+      LOG.info("Wait interval is {}", waitInterval);
+      if (waitInterval > 0) {
+        try {
+          LOG.info("Cleaner Thread Restarted and {} or {} is configured. So cleaner thread will startup post waiting "
+                  + "{} ms", EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL,
+              EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL.getHiveName(), waitInterval);
+          Thread.sleep(waitInterval);
+        } catch (InterruptedException e) {
+          LOG.error("Failed during the initial wait before start.", e);
+          if(isInTest) {
+            Thread.currentThread().interrupt();
+          }
+          return;
+        }
+        LOG.info("Completed Cleaner thread initial wait. Starting normal processing.");
+      }
+
       while (true) {
         LOG.debug("Cleaner thread running");
         try {
@@ -1448,5 +1478,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     public void setCleanupInterval(long configInterval) {
       sleepTime = configInterval;
     }
+
+    public void setWaitInterval(long waitInterval) {
+      this.waitInterval = waitInterval;
+    }
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
index 19a56de..dc22be2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
@@ -36,6 +36,7 @@ public class TestReplWithJsonMessageFormat extends TestReplicationScenarios {
       new ReplicationV1CompatRule(metaStoreClient, hconf, new ArrayList<String>() {{
           add("testEventFilters");
           add("testReplConfiguredCleanupOfNotificationEvents");
+          add("testCleanerThreadStartupWait");
       }});
 
   @BeforeClass
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 62f1dd9..61f61a8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -137,6 +137,8 @@ import org.apache.logging.log4j.LogManager;
 
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER;
@@ -252,6 +254,8 @@ public class TestReplicationScenarios {
     metaStoreClientMirror = new HiveMetaStoreClient(hconfMirror);
 
     PersistenceManagerProvider.setTwoMetastoreTesting(true);
+    MetastoreConf.setTimeVar(hconf, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, 0, TimeUnit.SECONDS);
+    MetastoreConf.setTimeVar(hconfMirror, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, 0, TimeUnit.SECONDS);
   }
 
   @AfterClass
@@ -2437,7 +2441,7 @@ public class TestReplicationScenarios {
 
     // For next run, CM is enabled, set REPL_EVENT_DB_LISTENER_TTL to low value for events to get deleted
     MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 * 60, TimeUnit.SECONDS);
-    MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds , TimeUnit.SECONDS);
+    MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds , TimeUnit.SECONDS);
     DbNotificationListener.resetCleaner(hconf);
 
     run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver);
@@ -2465,7 +2469,7 @@ public class TestReplicationScenarios {
     // First check with high ttl
     MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, false);
     MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds  * 60 * 60, TimeUnit.SECONDS);
-    MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds, TimeUnit.SECONDS);
+    MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds, TimeUnit.SECONDS);
     DbNotificationListener.resetCleaner(hconf);
 
     run("CREATE TABLE " + dbName
@@ -2491,7 +2495,7 @@ public class TestReplicationScenarios {
     //With CM disabled, set a low ttl for events to get deleted
     MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, false);
     MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds, TimeUnit.SECONDS);
-    MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds   * 60 * 60, TimeUnit.SECONDS);
+    MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds   * 60 * 60, TimeUnit.SECONDS);
     DbNotificationListener.resetCleaner(hconf);
 
     run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName
@@ -2514,13 +2518,73 @@ public class TestReplicationScenarios {
     //restore original values
     MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED, true);
     MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, 86400, TimeUnit.SECONDS);
-    MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, 864000, TimeUnit.SECONDS);
+    MetastoreConf.setTimeVar(hconf, REPL_EVENT_DB_LISTENER_TTL, 864000, TimeUnit.SECONDS);
     MetastoreConf.setTimeVar(hconf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, 7200, TimeUnit.SECONDS);
     DbNotificationListener.resetCleaner(hconf);
     verifySetupSteps = verifySetupOriginal;
   }
 
   @Test
+  public void testCleanerThreadStartupWait() throws Exception {
+    int eventsTtl = 20;
+    HiveConf newConf = new HiveConf(hconf);
+
+    // Set TTL short enough for testing.
+    MetastoreConf.setTimeVar(newConf, REPL_EVENT_DB_LISTENER_TTL, eventsTtl, TimeUnit.SECONDS);
+
+    // Set startup wait interval.
+    MetastoreConf.setTimeVar(newConf, EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, eventsTtl * 5, TimeUnit.SECONDS);
+
+    // Set cleaner wait interval.
+    MetastoreConf
+        .setTimeVar(newConf, MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, 10, TimeUnit.MILLISECONDS);
+    newConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL, true);
+    // Reset Cleaner to have a initial wait time.
+    DbNotificationListener.resetCleaner(newConf);
+
+    IMetaStoreClient msClient = metaStoreClient;
+    run("create database referenceDb", driver);
+
+    long firstEventId = msClient.getCurrentNotificationEventId().getEventId();;
+
+    run("create database cleanupStartup", driver);
+    run("drop database cleanupStartup", driver);
+
+    LOG.info("Done with creating events.");
+
+    // Check events are pushed into notification logs.
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(2, rsp.getEventsSize());
+
+    // Reset Cleaner to have a initial wait time.
+    DbNotificationListener.resetCleaner(newConf);
+
+    // Sleep for eventsTtl time and see if events are there.
+    Thread.sleep(eventsTtl * 1000);
+
+    // Check events are there in notification logs.
+    rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(2, rsp.getEventsSize());
+
+    // Sleep for some more time and see if events are there.
+    Thread.sleep(eventsTtl * 1000);
+
+    rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(2, rsp.getEventsSize());
+
+    // Sleep more than the initial wait time and see if Events get cleaned up post that
+    Thread.sleep(eventsTtl * 4000);
+
+    // Events should have cleaned up.
+    rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(0, rsp.getEventsSize());
+
+    // Reset with original configuration.
+    DbNotificationListener.resetCleaner(hconf);
+    run("drop database referenceDb", driver);
+  }
+
+  @Test
   public void testIncrementalInsertToPartition() throws IOException {
     String testName = "incrementalInsertToPartition";
     String dbName = createDB(testName, driver);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index e42493b..1738ab4 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -752,6 +752,9 @@ public class MetastoreConf {
     EVENT_DB_LISTENER_CLEAN_INTERVAL("metastore.event.db.listener.clean.interval",
             "hive.metastore.event.db.listener.clean.interval", 7200, TimeUnit.SECONDS,
             "sleep interval between each run for cleanup of events from the database listener queue"),
+    EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL("metastore.event.db.listener.clean.startup.wait.interval",
+        "hive.metastore.event.db.listener.clean.startup.wait.interval", 1, TimeUnit.DAYS,
+        "Wait interval post start of metastore after which the cleaner thread starts to work"),
     EVENT_DB_NOTIFICATION_API_AUTH("metastore.metastore.event.db.notification.api.auth",
         "hive.metastore.event.db.notification.api.auth", true,
         "Should metastore do authorization against database notification related APIs such as get_next_notification.\n" +