You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/02/21 16:59:11 UTC

hive git commit: HIVE-18259: Automatic cleanup of invalidation cache for materialized views (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 102731f6e -> 0b03b819f


HIVE-18259: Automatic cleanup of invalidation cache for materialized views (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0b03b819
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0b03b819
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0b03b819

Branch: refs/heads/master
Commit: 0b03b819f953905d28778c66bf10ea4f1fe47976
Parents: 102731f
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Mon Jan 22 18:42:48 2018 -0800
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Wed Feb 21 08:58:59 2018 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  13 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   5 +-
 .../MaterializationsCacheCleanerTask.java       |  63 ++++
 .../MaterializationsInvalidationCache.java      | 146 +++++++-
 .../hive/metastore/conf/MetastoreConf.java      |  19 +-
 ...stMetaStoreMaterializationsCacheCleaner.java | 331 +++++++++++++++++++
 6 files changed, 566 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 38f6430..169ddcb 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1161,7 +1161,7 @@ public class HiveConf extends Configuration {
     // materialized views
     HIVE_MATERIALIZED_VIEW_ENABLE_AUTO_REWRITING("hive.materializedview.rewriting", false,
         "Whether to try to rewrite queries using the materialized views enabled for rewriting"),
-    HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW("hive.materializedview.rewriting.time.window", 0,
+    HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW("hive.materializedview.rewriting.time.window", "0s", new TimeValidator(TimeUnit.SECONDS),
         "Time window, specified in seconds, after which outdated materialized views become invalid for automatic query rewriting.\n" +
         "For instance, if a materialized view is created and afterwards one of its source tables is changed at " +
         "moment in time t0, the materialized view will not be considered for rewriting anymore after t0 plus " +
@@ -1172,6 +1172,17 @@ public class HiveConf extends Configuration {
         "Default file format for CREATE MATERIALIZED VIEW statement"),
     HIVE_MATERIALIZED_VIEW_SERDE("hive.materializedview.serde",
         "org.apache.hadoop.hive.ql.io.orc.OrcSerde", "Default SerDe used for materialized views"),
+    HIVE_MATERIALIZATIONS_INVALIDATION_CACHE_IMPL("hive.metastore.materializations.invalidation.impl", "DEFAULT",
+        new StringSet("DEFAULT", "DISABLE"),
+        "The implementation that we should use for the materializations invalidation cache. \n" +
+            "  DEFAULT: Default implementation for invalidation cache\n" +
+            "  DISABLE: Disable invalidation cache (debugging purposes)"),
+    HIVE_MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY("hive.metastore.materializations.invalidation.clean.frequency",
+        "3600s", new TimeValidator(TimeUnit.SECONDS), "Frequency at which timer task runs to remove unnecessary transactions information from" +
+        "materializations invalidation cache."),
+    HIVE_MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION("hive.metastore.materializations.invalidation.max.duration",
+        "86400s", new TimeValidator(TimeUnit.SECONDS), "Maximum duration for query producing a materialization. After this time, transactions" +
+        "information that is not relevant for materializations can be removed from invalidation cache."),
 
     // hive.mapjoin.bucket.cache.size has been replaced by hive.smbjoin.cache.row,
     // need to remove by hive .13. Also, do not change default (see SMB operator)

http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 7b7e140..9c3b54f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -1293,7 +1294,9 @@ public class Hive {
    * @throws HiveException
    */
   public List<RelOptMaterialization> getValidMaterializedViews() throws HiveException {
-    final long diff = conf.getIntVar(HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW) * 1000;
+    final long diff =
+        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW,
+            TimeUnit.MILLISECONDS);
     final long minTime = System.currentTimeMillis() - diff;
     try {
       // Final result

http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
new file mode 100644
index 0000000..cc168a9
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Task responsible for cleaning the transactions that are not useful from the
+ * materializations cache.
+ */
+public class MaterializationsCacheCleanerTask implements MetastoreTaskThread {
+  private static final Logger LOG = LoggerFactory.getLogger(MaterializationsCacheCleanerTask.class);
+
+  private Configuration conf;
+
+  @Override
+  public long runFrequency(TimeUnit unit) {
+    return MetastoreConf.getTimeVar(conf,
+        MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY, unit);
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    conf = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void run() {
+    long removedCnt = MaterializationsInvalidationCache.get().cleanup(System.currentTimeMillis() -
+        MetastoreConf.getTimeVar(conf,
+            MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION, TimeUnit.MILLISECONDS));
+    if (removedCnt > 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Number of transaction entries deleted from materializations cache: " + removedCnt);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
index 92653ae..6e54eb9 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
@@ -17,8 +17,11 @@
  */
 package org.apache.hadoop.hive.metastore;
 
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -28,10 +31,13 @@ import java.util.concurrent.Executors;
 
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
 import org.apache.hadoop.hive.metastore.api.Materialization;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +60,9 @@ public final class MaterializationsInvalidationCache {
   /* Singleton */
   private static final MaterializationsInvalidationCache SINGLETON = new MaterializationsInvalidationCache();
 
+  /* If this boolean is true, this class has no functionality. Only for debugging purposes. */
+  private boolean disable;
+
   /* Key is the database name. Each value is a map from the unique view qualified name to
    * the materialization invalidation info. This invalidation object contains information
    * such as the tables used by the materialized view or the invalidation time, i.e., first
@@ -62,10 +71,10 @@ public final class MaterializationsInvalidationCache {
       new ConcurrentHashMap<String, ConcurrentMap<String, MaterializationInvalidationInfo>>();
 
   /*
-   * Key is a qualified table name. The value is a (sorted) tree set (supporting concurrent
-   * modifications) that will keep the modifications for a given table in the order that they
-   * happen. This is useful to quickly check the invalidation time for a given materialized
-   * view. 
+   * Key is a qualified table name. The value is a (sorted) tree map (supporting concurrent
+   * modifications) that will keep the modifications for a given table in the order of their
+   * transaction id. This is useful to quickly check the invalidation time for a given
+   * materialization.
    */
   private final ConcurrentMap<String, ConcurrentSkipListMap<Long, Long>> tableModifications =
       new ConcurrentHashMap<String, ConcurrentSkipListMap<Long, Long>>();
@@ -100,6 +109,14 @@ public final class MaterializationsInvalidationCache {
     this.store = store;
     this.txnStore = txnStore;
 
+    // This will only be true for debugging purposes
+    this.disable = MetastoreConf.getVar(store.getConf(),
+        MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_IMPL).equals("DISABLE");
+    if (disable) {
+      // Nothing to do
+      return;
+    }
+
     if (!initialized) {
       this.initialized = true;
       ExecutorService pool = Executors.newCachedThreadPool();
@@ -162,6 +179,10 @@ public final class MaterializationsInvalidationCache {
    */
   private void addMaterializedView(String dbName, String tableName, Set<String> tablesUsed,
       String validTxnList, OpType opType) {
+    if (disable) {
+      // Nothing to do
+      return;
+    }
     // We are going to create the map for each view in the given database
     ConcurrentMap<String, MaterializationInvalidationInfo> cq =
         new ConcurrentHashMap<String, MaterializationInvalidationInfo>();
@@ -225,6 +246,10 @@ public final class MaterializationsInvalidationCache {
    */
   public void notifyTableModification(String dbName, String tableName,
       long txnId, long newModificationTime) {
+    if (disable) {
+      // Nothing to do
+      return;
+    }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Notification for table {} in database {} received -> id: {}, time: {}",
           tableName, dbName, txnId, newModificationTime);
@@ -246,6 +271,10 @@ public final class MaterializationsInvalidationCache {
    * @param tableName
    */
   public void dropMaterializedView(String dbName, String tableName) {
+    if (disable) {
+      // Nothing to do
+      return;
+    }
     materializations.get(dbName).remove(tableName);
   }
 
@@ -308,12 +337,12 @@ public final class MaterializationsInvalidationCache {
     ValidTxnList txnList = new ValidReadTxnList(txnListString);
     long firstModificationTimeAfterCreation = 0L;
     for (String qNameTableUsed : materialization.getTablesUsed()) {
-      final Long tn = tableModifications.get(qNameTableUsed)
-          .higherKey(txnList.getHighWatermark());
+      final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed)
+          .higherEntry(txnList.getHighWatermark());
       if (tn != null) {
         if (firstModificationTimeAfterCreation == 0L ||
-            tn < firstModificationTimeAfterCreation) {
-          firstModificationTimeAfterCreation = tn;
+            tn.getValue() < firstModificationTimeAfterCreation) {
+          firstModificationTimeAfterCreation = tn.getValue();
         }
       }
       // Min open txn might be null if there were no open transactions
@@ -346,4 +375,105 @@ public final class MaterializationsInvalidationCache {
     ALTER
   }
 
+  /**
+   * Removes transaction events that are not relevant anymore.
+   * @param minTime events generated before this time (ms) can be deleted from the cache
+   * @return number of events that were deleted from the cache
+   */
+  public long cleanup(long minTime) {
+    // To remove, mv should meet two conditions:
+    // 1) Current time - time of transaction > config parameter, and
+    // 2) Transaction should not be associated with invalidation of a MV
+    if (disable || !initialized) {
+      // Bail out
+      return 0L;
+    }
+    // We execute the cleanup in two steps
+    // First we gather all the transactions that need to be kept
+    final Multimap<String, Long> keepTxnInfos = HashMultimap.create();
+    for (Map.Entry<String, ConcurrentMap<String, MaterializationInvalidationInfo>> e : materializations.entrySet()) {
+      for (MaterializationInvalidationInfo m : e.getValue().values()) {
+        ValidTxnList txnList = new ValidReadTxnList(m.getValidTxnList());
+        boolean canBeDeleted = false;
+        String currentTableForInvalidatingTxn = null;
+        long currentInvalidatingTxnId = 0L;
+        long currentInvalidatingTxnTime = 0L;
+        for (String qNameTableUsed : m.getTablesUsed()) {
+          final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed)
+              .higherEntry(txnList.getHighWatermark());
+          if (tn != null) {
+            if (currentInvalidatingTxnTime == 0L ||
+                tn.getValue() < currentInvalidatingTxnTime) {
+              // This transaction 1) is the first one examined for this materialization, or
+              // 2) it is the invalidating transaction. Hence we add it to the transactions to keep.
+              // 1.- We remove the previous invalidating transaction from the transactions
+              // to be kept (if needed).
+              if (canBeDeleted && currentInvalidatingTxnTime < minTime) {
+                keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId);
+              }
+              // 2.- We add this transaction to the transactions that should be kept.
+              canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(tn.getKey());
+              keepTxnInfos.put(qNameTableUsed, tn.getKey());
+              // 3.- We record this transaction as the current invalidating transaction.
+              currentTableForInvalidatingTxn = qNameTableUsed;
+              currentInvalidatingTxnId = tn.getKey();
+              currentInvalidatingTxnTime = tn.getValue();
+            }
+          }
+          if (txnList.getMinOpenTxn() != null) {
+            // Invalid transaction list is sorted
+            int pos = 0;
+            for (Entry<Long, Long> t : tableModifications.get(qNameTableUsed)
+                .subMap(txnList.getMinOpenTxn(), txnList.getHighWatermark()).entrySet()) {
+              while (pos < txnList.getInvalidTransactions().length &&
+                  txnList.getInvalidTransactions()[pos] != t.getKey()) {
+                pos++;
+              }
+              if (pos >= txnList.getInvalidTransactions().length) {
+                break;
+              }
+              if (currentInvalidatingTxnTime == 0L ||
+                  t.getValue() < currentInvalidatingTxnTime) {
+                // This transaction 1) is the first one examined for this materialization, or
+                // 2) it is the invalidating transaction. Hence we add it to the transactions to keep.
+                // 1.- We remove the previous invalidating transaction from the transactions
+                // to be kept (if needed).
+                if (canBeDeleted && currentInvalidatingTxnTime < minTime) {
+                  keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId);
+                }
+                // 2.- We add this transaction to the transactions that should be kept.
+                canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(t.getKey());
+                keepTxnInfos.put(qNameTableUsed, t.getKey());
+                // 3.- We record this transaction as the current invalidating transaction.
+                currentTableForInvalidatingTxn = qNameTableUsed;
+                currentInvalidatingTxnId = t.getKey();
+                currentInvalidatingTxnTime = t.getValue();
+              }
+            }
+          }
+        }
+      }
+    }
+    // Second, we remove the transactions
+    long removed = 0L;
+    for (Entry<String, ConcurrentSkipListMap<Long, Long>> e : tableModifications.entrySet()) {
+      Collection<Long> c = keepTxnInfos.get(e.getKey());
+      for (Iterator<Entry<Long, Long>> it = e.getValue().entrySet().iterator(); it.hasNext();) {
+        Entry<Long, Long> v = it.next();
+        // We need to check again the time because some of the transactions might not be explored
+        // above, e.g., transactions above the highest transaction mark for all the materialized
+        // views.
+        if (v.getValue() < minTime && (c.isEmpty() || !c.contains(v.getKey()))) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Transaction removed from cache for table {} -> id: {}, time: {}",
+                e.getKey(), v.getKey(), v.getValue());
+          }
+          it.remove();
+          removed++;
+        }
+      }
+    }
+    return removed;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 699a649..9f82256 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.DefaultStorageSchemaReader;
 import org.apache.hadoop.hive.metastore.HiveAlterHandler;
+import org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask;
 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
 import org.apache.hadoop.hive.metastore.events.EventCleanerTask;
 import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
@@ -553,6 +554,20 @@ public class MetastoreConf {
         "javax.jdo.PersistenceManagerFactoryClass",
         "org.datanucleus.api.jdo.JDOPersistenceManagerFactory",
         "class implementing the jdo persistence"),
+    MATERIALIZATIONS_INVALIDATION_CACHE_IMPL("metastore.materializations.invalidation.impl",
+        "hive.metastore.materializations.invalidation.impl", "DEFAULT",
+        new Validator.StringSet("DEFAULT", "DISABLE"),
+        "The implementation that we should use for the materializations invalidation cache. \n" +
+            "  DEFAULT: Default implementation for invalidation cache\n" +
+            "  DISABLE: Disable invalidation cache (debugging purposes)"),
+    MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY("metastore.materializations.invalidation.clean.frequency",
+         "hive.metastore.materializations.invalidation.clean.frequency",
+         3600, TimeUnit.SECONDS, "Frequency at which timer task runs to remove unnecessary transaction entries from" +
+          "materializations invalidation cache."),
+    MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION("metastore.materializations.invalidation.max.duration",
+         "hive.metastore.materializations.invalidation.max.duration",
+         86400, TimeUnit.SECONDS, "Maximum duration for query producing a materialization. After this time, transaction" +
+         "entries that are not relevant for materializations can be removed from invalidation cache."),
     // Parameters for exporting metadata on table drop (requires the use of the)
     // org.apache.hadoop.hive.ql.parse.MetaDataExportListener preevent listener
     METADATA_EXPORT_LOCATION("metastore.metadata.export.location", "hive.metadata.export.location",
@@ -708,7 +723,9 @@ public class MetastoreConf {
             + "The only supported special character right now is '/'. This flag applies only to quoted table names.\n"
             + "The default value is true."),
     TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always",
-        EventCleanerTask.class.getName() + "," + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask",
+        EventCleanerTask.class.getName() + "," +
+        "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," +
+        MaterializationsCacheCleanerTask.class.getName(),
         "Comma separated list of tasks that will be started in separate threads.  These will " +
             "always be started, regardless of whether the metastore is running in embedded mode " +
             "or in server mode.  They must implement " + MetastoreTaskThread.class.getName()),

http://git-wip-us.apache.org/repos/asf/hive/blob/0b03b819/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java
new file mode 100644
index 0000000..8f8bff6
--- /dev/null
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMaterializationsCacheCleaner.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
+import org.apache.hadoop.hive.metastore.api.CreationMetadata;
+import org.apache.hadoop.hive.metastore.api.Materialization;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.junit.Assert;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache}.
+ * The tests focus on arrival of notifications (possibly out of order) and the logic
+ * to clean up the materializations cache. Tests need to be executed in a certain order
+ * to avoid interactions among them, as the invalidation cache is a singleton.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class TestMetaStoreMaterializationsCacheCleaner {
+
+  private static final String DB_NAME = "hive3252";
+  private static final String TBL_NAME_1 = "tmptbl1";
+  private static final String TBL_NAME_2 = "tmptbl2";
+  private static final String TBL_NAME_3 = "tmptbl3";
+  private static final String MV_NAME_1 = "mv1";
+  private static final String MV_NAME_2 = "mv2";
+
+
+  @Test
+  public void testCleanerScenario1() throws Exception {
+    // create mock raw store
+    final RawStore rawStore = mock(RawStore.class);
+    when(rawStore.getAllDatabases()).thenReturn(ImmutableList.of());
+    Configuration conf = new Configuration();
+    conf.set("metastore.materializations.invalidation.impl", "DISABLE");
+    when(rawStore.getConf()).thenReturn(conf);
+    // create mock txn store
+    final TxnStore txnStore = mock(TxnStore.class);
+    // initialize invalidation cache (set conf to disable)
+    MaterializationsInvalidationCache.get().init(rawStore, txnStore);
+
+    // This is a dummy test, invalidation cache is not supposed to
+    // record any information.
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, 1, 1);
+    int id = 2;
+    BasicTxnInfo txn2 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, id, id);
+    // Create tbl2 (nothing to do)
+    id = 3;
+    BasicTxnInfo txn3 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_2, id, id);
+    // Cleanup (current = 4, duration = 4) -> Does nothing
+    long removed = MaterializationsInvalidationCache.get().cleanup(0L);
+    Assert.assertEquals(0L, removed);
+    // Create mv1
+    Table mv1 = mock(Table.class);
+    when(mv1.getDbName()).thenReturn(DB_NAME);
+    when(mv1.getTableName()).thenReturn(MV_NAME_1);
+    CreationMetadata mockCM1 = new CreationMetadata(
+        DB_NAME, MV_NAME_1,
+        ImmutableSet.of(
+            DB_NAME + "." + TBL_NAME_1,
+            DB_NAME + "." + TBL_NAME_2));
+    // Create txn list (highWatermark=4;minOpenTxn=Long.MAX_VALUE)
+    mockCM1.setValidTxnList("3:" + Long.MAX_VALUE + "::");
+    when(mv1.getCreationMetadata()).thenReturn(mockCM1);
+    MaterializationsInvalidationCache.get().createMaterializedView(mockCM1.getDbName(), mockCM1.getTblName(),
+        mockCM1.getTablesUsed(), mockCM1.getValidTxnList());
+    Map<String, Materialization> invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1));
+    Assert.assertTrue(invalidationInfos.isEmpty());
+    id = 10;
+    BasicTxnInfo txn10 = createTxnInfo(DB_NAME, TBL_NAME_2, id);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_2, id, id);
+    id = 9;
+    BasicTxnInfo txn9 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, id, id);
+    // Cleanup (current = 12, duration = 4) -> Removes txn1, txn2, txn3
+    removed = MaterializationsInvalidationCache.get().cleanup(8L);
+    Assert.assertEquals(0L, removed);
+    invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1));
+    Assert.assertTrue(invalidationInfos.isEmpty());
+    // Create mv2
+    Table mv2 = mock(Table.class);
+    when(mv2.getDbName()).thenReturn(DB_NAME);
+    when(mv2.getTableName()).thenReturn(MV_NAME_2);
+    CreationMetadata mockCM2 = new CreationMetadata(
+        DB_NAME, MV_NAME_2,
+        ImmutableSet.of(
+            DB_NAME + "." + TBL_NAME_1,
+            DB_NAME + "." + TBL_NAME_2));
+    // Create txn list (highWatermark=10;minOpenTxn=Long.MAX_VALUE)
+    mockCM2.setValidTxnList("10:" + Long.MAX_VALUE + "::");
+    when(mv2.getCreationMetadata()).thenReturn(mockCM2);
+    MaterializationsInvalidationCache.get().createMaterializedView(mockCM2.getDbName(), mockCM2.getTblName(),
+        mockCM2.getTablesUsed(), mockCM2.getValidTxnList());
+    when(mv2.getCreationMetadata()).thenReturn(mockCM2);
+    invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+    Assert.assertTrue(invalidationInfos.isEmpty());
+    // Create tbl3 (nothing to do)
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_3, 11, 11);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_3, 18, 18);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, 14, 14);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, 17, 17);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_2, 16, 16);
+    // Cleanup (current = 20, duration = 4) -> Removes txn10, txn11
+    removed = MaterializationsInvalidationCache.get().cleanup(16L);
+    Assert.assertEquals(0L, removed);
+    invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+    Assert.assertTrue(invalidationInfos.isEmpty());
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, 12, 12);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_2, 15, 15);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_2, 7, 7);
+    invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+    Assert.assertTrue(invalidationInfos.isEmpty());
+    // Cleanup (current = 24, duration = 4) -> Removes txn9, txn14, txn15, txn16, txn17, txn18
+    removed = MaterializationsInvalidationCache.get().cleanup(20L);
+    Assert.assertEquals(0L, removed);
+    invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+    Assert.assertTrue(invalidationInfos.isEmpty());
+    // Cleanup (current = 28, duration = 4) -> Removes txn9
+    removed = MaterializationsInvalidationCache.get().cleanup(24L);
+    Assert.assertEquals(0L, removed);
+  }
+
+  @Test
+  public void testCleanerScenario2() throws Exception {
+    // create mock raw store
+    final RawStore rawStore = mock(RawStore.class);
+    when(rawStore.getAllDatabases()).thenReturn(ImmutableList.of());
+    Configuration conf = new Configuration();
+    conf.set("metastore.materializations.invalidation.impl", "DEFAULT");
+    when(rawStore.getConf()).thenReturn(conf);
+    // create mock txn store
+    final TxnStore txnStore = mock(TxnStore.class);
+    // initialize invalidation cache (set conf to default)
+    MaterializationsInvalidationCache.get().init(rawStore, txnStore);
+
+    // Scenario consists of the following steps:
+    // Create tbl1
+    // (t = 1) Insert row in tbl1
+    // (t = 2) Insert row in tbl1
+    // Create tbl2
+    // (t = 3) Insert row in tbl2
+    // Cleanup (current = 4, duration = 4) -> Does nothing
+    // Create mv1
+    // (t = 10) Insert row in tbl2
+    // (t = 9) Insert row in tbl1 (out of order)
+    // Cleanup (current = 12, duration = 4) -> Removes txn1, txn2, txn3
+    // Create mv2
+    // Create tbl3
+    // (t = 11) Insert row in tbl3
+    // (t = 18) Insert row in tbl3
+    // (t = 14) Insert row in tbl1
+    // (t = 17) Insert row in tbl1
+    // (t = 16) Insert row in tbl2
+    // Cleanup (current = 20, duration = 4) -> Removes txn10, txn11
+    // (t = 12) Insert row in tbl1
+    // (t = 15) Insert row in tbl2
+    // (t = 7) Insert row in tbl2
+    // Cleanup (current = 24, duration = 4) -> Removes txn9, txn14, txn15, txn16, txn17, txn18
+    // Create tbl1 (nothing to do)
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, 1, 1);
+    int id = 2;
+    BasicTxnInfo txn2 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, id, id);
+    // Create tbl2 (nothing to do)
+    id = 3;
+    BasicTxnInfo txn3 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_2, id, id);
+    // Cleanup (current = 4, duration = 4) -> Does nothing
+    long removed = MaterializationsInvalidationCache.get().cleanup(0L);
+    Assert.assertEquals(0L, removed);
+    // Create mv1
+    Table mv1 = mock(Table.class);
+    when(mv1.getDbName()).thenReturn(DB_NAME);
+    when(mv1.getTableName()).thenReturn(MV_NAME_1);
+    CreationMetadata mockCM1 = new CreationMetadata(
+        DB_NAME, MV_NAME_1,
+        ImmutableSet.of(
+            DB_NAME + "." + TBL_NAME_1,
+            DB_NAME + "." + TBL_NAME_2));
+    // Create txn list (highWatermark=4;minOpenTxn=Long.MAX_VALUE)
+    mockCM1.setValidTxnList("3:" + Long.MAX_VALUE + "::");
+    when(mv1.getCreationMetadata()).thenReturn(mockCM1);
+    MaterializationsInvalidationCache.get().createMaterializedView(mockCM1.getDbName(), mockCM1.getTblName(),
+        mockCM1.getTablesUsed(), mockCM1.getValidTxnList());
+    Map<String, Materialization> invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1));
+    Assert.assertEquals(0L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+    id = 10;
+    BasicTxnInfo txn10 = createTxnInfo(DB_NAME, TBL_NAME_2, id);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_2, id, id);
+    id = 9;
+    BasicTxnInfo txn9 = createTxnInfo(DB_NAME, TBL_NAME_1, id);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, id, id);
+    // Cleanup (current = 12, duration = 4) -> Removes txn1, txn2, txn3
+    removed = MaterializationsInvalidationCache.get().cleanup(8L);
+    Assert.assertEquals(3L, removed);
+    invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1));
+    Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+    // Create mv2
+    Table mv2 = mock(Table.class);
+    when(mv2.getDbName()).thenReturn(DB_NAME);
+    when(mv2.getTableName()).thenReturn(MV_NAME_2);
+    CreationMetadata mockCM2 = new CreationMetadata(
+        DB_NAME, MV_NAME_2,
+        ImmutableSet.of(
+            DB_NAME + "." + TBL_NAME_1,
+            DB_NAME + "." + TBL_NAME_2));
+    // Create txn list (highWatermark=10;minOpenTxn=Long.MAX_VALUE)
+    mockCM2.setValidTxnList("10:" + Long.MAX_VALUE + "::");
+    when(mv2.getCreationMetadata()).thenReturn(mockCM2);
+    MaterializationsInvalidationCache.get().createMaterializedView(mockCM2.getDbName(), mockCM2.getTblName(),
+        mockCM2.getTablesUsed(), mockCM2.getValidTxnList());
+    when(mv2.getCreationMetadata()).thenReturn(mockCM2);
+    invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+    Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+    Assert.assertEquals(0L, invalidationInfos.get(MV_NAME_2).getInvalidationTime());
+    // Create tbl3 (nothing to do)
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_3, 11, 11);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_3, 18, 18);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, 14, 14);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, 17, 17);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_2, 16, 16);
+    // Cleanup (current = 20, duration = 4) -> Removes txn10, txn11
+    removed = MaterializationsInvalidationCache.get().cleanup(16L);
+    Assert.assertEquals(2L, removed);
+    invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+    Assert.assertEquals(9L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+    Assert.assertEquals(14L, invalidationInfos.get(MV_NAME_2).getInvalidationTime());
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_1, 12, 12);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_2, 15, 15);
+    MaterializationsInvalidationCache.get().notifyTableModification(
+        DB_NAME, TBL_NAME_2, 7, 7);
+    invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+    Assert.assertEquals(7L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+    Assert.assertEquals(12L, invalidationInfos.get(MV_NAME_2).getInvalidationTime());
+    // Cleanup (current = 24, duration = 4) -> Removes txn9, txn14, txn15, txn16, txn17, txn18
+    removed = MaterializationsInvalidationCache.get().cleanup(20L);
+    Assert.assertEquals(6L, removed);
+    invalidationInfos =
+        MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(
+            DB_NAME, ImmutableList.of(MV_NAME_1, MV_NAME_2));
+    Assert.assertEquals(7L, invalidationInfos.get(MV_NAME_1).getInvalidationTime());
+    Assert.assertEquals(12L, invalidationInfos.get(MV_NAME_2).getInvalidationTime());
+    // Cleanup (current = 28, duration = 4) -> Removes txn9
+    removed = MaterializationsInvalidationCache.get().cleanup(24L);
+    Assert.assertEquals(0L, removed);
+  }
+
+  private static BasicTxnInfo createTxnInfo(String dbName, String tableName, int i) {
+    BasicTxnInfo r = new BasicTxnInfo();
+    r.setDbname(dbName);
+    r.setTablename(tableName);
+    r.setTxnid(i);
+    r.setTime(i);
+    return r;
+  }
+}