You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2019/10/11 17:15:15 UTC

[hive] branch master updated: HIVE-21344: CBO: Reduce compilation time in presence of materialized views (Jesus Camacho Rodriguez, reviewed by Vineet Garg)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new f16509a  HIVE-21344: CBO: Reduce compilation time in presence of materialized views (Jesus Camacho Rodriguez, reviewed by Vineet Garg)
f16509a is described below

commit f16509a5c9187f592c48c253ee001fc3a5e0d508
Author: Jesus Camacho Rodriguez <jc...@apache.org>
AuthorDate: Fri Oct 11 10:14:12 2019 -0700

    HIVE-21344: CBO: Reduce compilation time in presence of materialized views (Jesus Camacho Rodriguez, reviewed by Vineet Garg)
    
    Close apache/hive#749
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   4 +
 .../org/apache/hadoop/hive/ql/log/PerfLogger.java  |   1 +
 .../ql/ddl/view/DropMaterializedViewOperation.java |   2 +-
 .../ddl/view/MaterializedViewUpdateOperation.java  |   2 +
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   | 240 +++++++++++++++----
 .../ql/metadata/HiveMaterializedViewsRegistry.java | 256 +++++++++++++++------
 .../hadoop/hive/ql/parse/CalcitePlanner.java       | 204 ++++++++++------
 .../hadoop/hive/ql/parse/DDLSemanticAnalyzer.java  |  18 +-
 .../materialized_view_partitioned_2.q              |   4 +-
 .../results/clientnegative/strict_pruning_2.q.out  |   2 +-
 .../beeline/materialized_view_create_rewrite.q.out |   6 +
 .../llap/materialized_view_create_rewrite.q.out    |   6 +
 .../llap/materialized_view_create_rewrite_4.q.out  |   6 +
 .../materialized_view_create_rewrite_dummy.q.out   |  12 +
 ...erialized_view_create_rewrite_time_window.q.out |   6 +
 .../llap/materialized_view_partitioned_2.q.out     |  14 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java      |  57 +++--
 17 files changed, 613 insertions(+), 227 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index afee315..3b71164 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3255,6 +3255,10 @@ public class HiveConf extends Configuration {
         "The implementation that we should use for the materialized views registry. \n" +
         "  DEFAULT: Default cache for materialized views\n" +
         "  DUMMY: Do not cache materialized views and hence forward requests to metastore"),
+    HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_REFRESH("hive.server2.materializedviews.registry.refresh.period", "60s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "Period, specified in seconds, between successive refreshes of the registry to pull new materializations " +
+        "from the metastore that may have been created by other HS2 instances."),
 
     // HiveServer2 WebUI
     HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"),
diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
index a9cb009..2707987 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
@@ -43,6 +43,7 @@ public class PerfLogger {
   public static final String PARSE = "parse";
   public static final String ANALYZE = "semanticAnalyze";
   public static final String OPTIMIZER = "optimizer";
+  public static final String MATERIALIZED_VIEWS_REGISTRY_REFRESH = "MaterializedViewsRegistryRefresh";
   public static final String DO_AUTHORIZATION = "doAuthorization";
   public static final String DRIVER_EXECUTE = "Driver.execute";
   public static final String INPUT_SUMMARY = "getInputSummary";
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java
index de09a55..21b456a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/DropMaterializedViewOperation.java
@@ -54,7 +54,7 @@ public class DropMaterializedViewOperation extends DDLOperation<DropMaterialized
 
     // TODO: API w/catalog name
     context.getDb().dropTable(desc.getTableName(), false);
-    HiveMaterializedViewsRegistry.get().dropMaterializedView(table);
+    HiveMaterializedViewsRegistry.get().dropMaterializedView(table.getDbName(), table.getTableName());
     DDLUtils.addIfAbsentByName(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK), context);
 
     return 0;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java
index ad6e163..4154cb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/MaterializedViewUpdateOperation.java
@@ -60,6 +60,8 @@ public class MaterializedViewUpdateOperation extends DDLOperation<MaterializedVi
             ImmutableSet.copyOf(mvTable.getCreationMetadata().getTablesUsed()));
         cm.setValidTxnList(context.getConf().get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
         context.getDb().updateCreationMetadata(mvTable.getDbName(), mvTable.getTableName(), cm);
+        mvTable.setCreationMetadata(cm);
+        HiveMaterializedViewsRegistry.get().createMaterializedView(context.getDb().getConf(), mvTable);
       }
     } catch (HiveException e) {
       LOG.debug("Exception during materialized view cache update", e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index db8cc6c..6143e85 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1614,6 +1614,150 @@ public class Hive {
 
   /**
    * Get the materialized views that have been enabled for rewriting from the
+   * cache (registry). It will preprocess them to discard those that are
+   * outdated and augment those that need to be augmented, e.g., if incremental
+   * rewriting is enabled.
+   *
+   * @return the list of materialized views available for rewriting from the registry
+   * @throws HiveException
+   */
+  public List<RelOptMaterialization> getPreprocessedMaterializedViewsFromRegistry(
+      List<String> tablesUsed, HiveTxnManager txnMgr) throws HiveException {
+    // From cache
+    List<RelOptMaterialization> materializedViews =
+        HiveMaterializedViewsRegistry.get().getRewritingMaterializedViews();
+    if (materializedViews.isEmpty()) {
+      // Bail out: empty list
+      return new ArrayList<>();
+    }
+    // Add to final result
+    return filterAugmentMaterializedViews(materializedViews, tablesUsed, txnMgr);
+  }
+
+  private List<RelOptMaterialization> filterAugmentMaterializedViews(List<RelOptMaterialization> materializedViews,
+        List<String> tablesUsed, HiveTxnManager txnMgr) throws HiveException {
+    final String validTxnsList = conf.get(ValidTxnList.VALID_TXNS_KEY);
+    final ValidTxnWriteIdList currentTxnWriteIds = txnMgr.getValidWriteIds(tablesUsed, validTxnsList);
+    final boolean tryIncrementalRewriting =
+        HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_INCREMENTAL);
+    final long defaultTimeWindow =
+        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW,
+            TimeUnit.MILLISECONDS);
+    try {
+      // Final result
+      List<RelOptMaterialization> result = new ArrayList<>();
+      for (RelOptMaterialization materialization : materializedViews) {
+        final RelNode viewScan = materialization.tableRel;
+        final Table materializedViewTable;
+        if (viewScan instanceof Project) {
+          // There is a Project on top (due to nullability)
+          materializedViewTable = ((RelOptHiveTable) viewScan.getInput(0).getTable()).getHiveTableMD();
+        } else {
+          materializedViewTable = ((RelOptHiveTable) viewScan.getTable()).getHiveTableMD();
+        }
+        final Boolean outdated = isOutdatedMaterializedView(materializedViewTable, currentTxnWriteIds,
+            defaultTimeWindow, tablesUsed, false);
+        if (outdated == null) {
+          continue;
+        }
+
+        final CreationMetadata creationMetadata = materializedViewTable.getCreationMetadata();
+        if (outdated) {
+          // The MV is outdated, see whether we should consider it for rewriting or not
+          if (!tryIncrementalRewriting) {
+            LOG.debug("Materialized view " + materializedViewTable.getFullyQualifiedName() +
+                " ignored for rewriting as its contents are outdated");
+            continue;
+          }
+          // We will rewrite it to include the filters on transaction list
+          // so we can produce partial rewritings.
+          // This would be costly since we are doing it for every materialized view
+          // that is outdated, but it only happens for more than one materialized view
+          // if rewriting with outdated materialized views is enabled (currently
+          // disabled by default).
+          materialization = augmentMaterializationWithTimeInformation(
+              materialization, validTxnsList, new ValidTxnWriteIdList(
+                  creationMetadata.getValidTxnList()));
+        }
+        result.add(materialization);
+      }
+      return result;
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  /**
+   * Validate that the materialized views retrieved from registry are still up-to-date.
+   * For those that are not, the method loads them from the metastore into the registry.
+   *
+   * @return true if they are up-to-date, otherwise false
+   * @throws HiveException
+   */
+  public boolean validateMaterializedViewsFromRegistry(List<Table> cachedMaterializedViewTables,
+      List<String> tablesUsed, HiveTxnManager txnMgr) throws HiveException {
+    final long defaultTimeWindow =
+        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW,
+            TimeUnit.MILLISECONDS);
+    final String validTxnsList = conf.get(ValidTxnList.VALID_TXNS_KEY);
+    final ValidTxnWriteIdList currentTxnWriteIds = txnMgr.getValidWriteIds(tablesUsed, validTxnsList);
+    try {
+      // Final result
+      boolean result = true;
+      for (Table cachedMaterializedViewTable : cachedMaterializedViewTables) {
+        // Retrieve the materialized view table from the metastore
+        final Table materializedViewTable = getTable(
+            cachedMaterializedViewTable.getDbName(), cachedMaterializedViewTable.getTableName());
+        if (materializedViewTable == null || !materializedViewTable.isRewriteEnabled()) {
+          // This could happen if materialized view has been deleted or rewriting has been disabled.
+          // We remove it from the registry and set result to false.
+          HiveMaterializedViewsRegistry.get().dropMaterializedView(cachedMaterializedViewTable);
+          result = false;
+        } else {
+          final Boolean outdated = isOutdatedMaterializedView(cachedMaterializedViewTable,
+              currentTxnWriteIds, defaultTimeWindow, tablesUsed, false);
+          if (outdated == null) {
+            result = false;
+            continue;
+          }
+          // If the cached materialized view was not outdated wrt the query snapshot,
+          // then we know that the metastore version should be either the same or
+          // more recent. If it is more recent, snapshot isolation will shield us
+          // from the reading its contents after snapshot was acquired, but we will
+          // update the registry so we have most recent version.
+          // On the other hand, if the materialized view in the cache was outdated,
+          // we can only use it if the version that was in the cache is the same one
+          // that we can find in the metastore.
+          if (outdated) {
+            if (!cachedMaterializedViewTable.equals(materializedViewTable)) {
+              // We ignore and update the registry
+              HiveMaterializedViewsRegistry.get().refreshMaterializedView(conf, cachedMaterializedViewTable, materializedViewTable);
+              result = false;
+            } else {
+              // Obtain additional information if we should try incremental rewriting / rebuild
+              // We will not try partial rewriting if there were update/delete operations on source tables
+              Materialization invalidationInfo = getMSC().getMaterializationInvalidationInfo(
+                  materializedViewTable.getCreationMetadata(), conf.get(ValidTxnList.VALID_TXNS_KEY));
+              if (invalidationInfo == null || invalidationInfo.isSourceTablesUpdateDeleteModified()) {
+                // We ignore (as it did not meet the requirements), but we do not need to update it in the
+                // registry, since it is up-to-date
+                result = false;
+              }
+            }
+          } else if (!cachedMaterializedViewTable.equals(materializedViewTable)) {
+            // Update the registry
+            HiveMaterializedViewsRegistry.get().refreshMaterializedView(conf, cachedMaterializedViewTable, materializedViewTable);
+          }
+        }
+      }
+      return result;
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  /**
+   * Get the materialized views that have been enabled for rewriting from the
    * metastore. If the materialized view is in the cache, we do not need to
    * parse it to generate a logical plan for the rewriting. Instead, we
    * return the version present in the cache. Further, information provided
@@ -1623,33 +1767,42 @@ public class Hive {
    * @return the list of materialized views available for rewriting
    * @throws HiveException
    */
-  public List<RelOptMaterialization> getAllValidMaterializedViews(List<String> tablesUsed, boolean forceMVContentsUpToDate,
-      HiveTxnManager txnMgr) throws HiveException {
-    // Final result
-    List<RelOptMaterialization> result = new ArrayList<>();
-    try {
-      // From metastore (for security)
-      List<Table> materializedViews = getAllMaterializedViewObjectsForRewriting();
-      if (materializedViews.isEmpty()) {
-        // Bail out: empty list
-        return result;
-      }
-      result.addAll(getValidMaterializedViews(materializedViews,
-          tablesUsed, forceMVContentsUpToDate, txnMgr));
-      return result;
-    } catch (Exception e) {
-      throw new HiveException(e);
+  public List<RelOptMaterialization> getPreprocessedMaterializedViews(
+      List<String> tablesUsed, HiveTxnManager txnMgr)
+      throws HiveException {
+    // From metastore
+    List<Table> materializedViewTables =
+        getAllMaterializedViewObjectsForRewriting();
+    if (materializedViewTables.isEmpty()) {
+      // Bail out: empty list
+      return new ArrayList<>();
     }
+    // Return final result
+    return getValidMaterializedViews(materializedViewTables, tablesUsed, false, txnMgr);
   }
 
-  public List<RelOptMaterialization> getValidMaterializedView(String dbName, String materializedViewName,
-      List<String> tablesUsed, boolean forceMVContentsUpToDate, HiveTxnManager txnMgr) throws HiveException {
-    return getValidMaterializedViews(ImmutableList.of(getTable(dbName, materializedViewName)),
-            tablesUsed, forceMVContentsUpToDate, txnMgr);
+  /**
+   * Get the target materialized view from the metastore. Although it may load the plan
+   * from the registry, it is guaranteed that it will always return an up-to-date version
+   * wrt metastore.
+   *
+   * @return the materialized view for rebuild
+   * @throws HiveException
+   */
+  public RelOptMaterialization getMaterializedViewForRebuild(String dbName, String materializedViewName,
+      List<String> tablesUsed, HiveTxnManager txnMgr) throws HiveException {
+    List<RelOptMaterialization> validMaterializedViews = getValidMaterializedViews(
+        ImmutableList.of(getTable(dbName, materializedViewName)), tablesUsed, true, txnMgr);
+    if (validMaterializedViews.isEmpty()) {
+      return null;
+    }
+    assert validMaterializedViews.size() == 1;
+    return validMaterializedViews.get(0);
   }
 
   private List<RelOptMaterialization> getValidMaterializedViews(List<Table> materializedViewTables,
-      List<String> tablesUsed, boolean forceMVContentsUpToDate, HiveTxnManager txnMgr) throws HiveException {
+      List<String> tablesUsed, boolean forceMVContentsUpToDate, HiveTxnManager txnMgr)
+      throws HiveException {
     final String validTxnsList = conf.get(ValidTxnList.VALID_TXNS_KEY);
     final ValidTxnWriteIdList currentTxnWriteIds = txnMgr.getValidWriteIds(tablesUsed, validTxnsList);
     final boolean tryIncrementalRewriting =
@@ -1672,7 +1825,7 @@ public class Hive {
         final CreationMetadata creationMetadata = materializedViewTable.getCreationMetadata();
         if (outdated) {
           // The MV is outdated, see whether we should consider it for rewriting or not
-          boolean ignore = false;
+          boolean ignore;
           if (forceMVContentsUpToDate && !tryIncrementalRebuild) {
             // We will not try partial rewriting for rebuild if incremental rebuild is disabled
             ignore = true;
@@ -1706,8 +1859,7 @@ public class Hive {
           } else {
             cachedMaterializedViewTable = (RelOptHiveTable) viewScan.getTable();
           }
-          if (cachedMaterializedViewTable.getHiveTableMD().getCreateTime() ==
-              materializedViewTable.getCreateTime()) {
+          if (cachedMaterializedViewTable.getHiveTableMD().equals(materializedViewTable)) {
             // It is in the cache and up to date
             if (outdated) {
               // We will rewrite it to include the filters on transaction list
@@ -1722,31 +1874,23 @@ public class Hive {
         }
 
         // It was not present in the cache (maybe because it was added by another HS2)
-        // or it is not up to date.
-        if (HiveMaterializedViewsRegistry.get().isInitialized()) {
-          // But the registry was fully initialized, thus we need to add it
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Materialized view " + materializedViewTable.getFullyQualifiedName() +
-                " was not in the cache");
-          }
-          materialization = HiveMaterializedViewsRegistry.get().createMaterializedView(
-              conf, materializedViewTable);
-          if (materialization != null) {
-            if (outdated) {
-              // We will rewrite it to include the filters on transaction list
-              // so we can produce partial rewritings
-              materialization = augmentMaterializationWithTimeInformation(
-                  materialization, validTxnsList, new ValidTxnWriteIdList(
-                      creationMetadata.getValidTxnList()));
-            }
-            result.add(materialization);
-          }
-        } else {
-          // Otherwise the registry has not been initialized, skip for the time being
-          if (LOG.isWarnEnabled()) {
-            LOG.info("Materialized view " + materializedViewTable.getFullyQualifiedName() + " was skipped "
-                + "because cache has not been loaded yet");
+        // or it is not up to date. We need to add it
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Materialized view " + materializedViewTable.getFullyQualifiedName() +
+              " was not in the cache");
+        }
+        materialization = HiveMaterializedViewsRegistry.get().createMaterialization(
+            conf, materializedViewTable);
+        if (materialization != null) {
+          HiveMaterializedViewsRegistry.get().refreshMaterializedView(conf, null, materializedViewTable);
+          if (outdated) {
+            // We will rewrite it to include the filters on transaction list
+            // so we can produce partial rewritings
+            materialization = augmentMaterializationWithTimeInformation(
+                materialization, validTxnsList, new ValidTxnWriteIdList(
+                    creationMetadata.getValidTxnList()));
           }
+          result.add(materialization);
         }
       }
       return result;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
index c339258..4592f5e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMaterializedViewsRegistry.java
@@ -25,12 +25,14 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.function.BiFunction;
 import org.apache.calcite.adapter.druid.DruidQuery;
 import org.apache.calcite.adapter.druid.DruidSchema;
 import org.apache.calcite.adapter.druid.DruidTable;
@@ -40,6 +42,7 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptMaterialization;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -49,10 +52,12 @@ import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.DefaultMetaStoreFilterHookImpl;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
 import org.apache.hadoop.hive.ql.optimizer.calcite.HiveTypeSystemImpl;
 import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
@@ -84,6 +89,7 @@ import com.google.common.collect.ImmutableList;
 public final class HiveMaterializedViewsRegistry {
 
   private static final Logger LOG = LoggerFactory.getLogger(HiveMaterializedViewsRegistry.class);
+  private static final String CLASS_NAME = HiveMaterializedViewsRegistry.class.getName();
 
   /* Singleton */
   private static final HiveMaterializedViewsRegistry SINGLETON = new HiveMaterializedViewsRegistry();
@@ -138,11 +144,13 @@ public final class HiveMaterializedViewsRegistry {
       LOG.info("Using dummy materialized views registry");
     } else {
       // We initialize the cache
-      ExecutorService pool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
-        .setNameFormat("HiveMaterializedViewsRegistry-%d")
-         .build());
-      pool.submit(new Loader(db));
-      pool.shutdown();
+      long period = HiveConf.getTimeVar(db.getConf(), ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_REFRESH, TimeUnit.SECONDS);
+      ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat("HiveMaterializedViewsRegistry-%d")
+              .build());
+      pool.scheduleAtFixedRate(new Loader(db), 0, period, TimeUnit.SECONDS);
     }
   }
 
@@ -155,20 +163,44 @@ public final class HiveMaterializedViewsRegistry {
 
     @Override
     public void run() {
+      SessionState ss = new SessionState(db.getConf());
+      ss.setIsHiveServerQuery(true); // All is served from HS2, we do not need e.g. Tez sessions
+      SessionState.start(ss);
+      PerfLogger perfLogger = SessionState.getPerfLogger();
+      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.MATERIALIZED_VIEWS_REGISTRY_REFRESH);
       try {
-        SessionState ss = new SessionState(db.getConf());
-        ss.setIsHiveServerQuery(true); // All is served from HS2, we do not need e.g. Tez sessions
-        SessionState.start(ss);
-        final boolean cache = !db.getConf()
-            .get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname).equals("DUMMY");
-        for (Table mv : db.getAllMaterializedViewObjectsForRewriting()) {
-          addMaterializedView(db.getConf(), mv, OpType.LOAD, cache);
+        if (initialized.get()) {
+          for (Table mvTable : db.getAllMaterializedViewObjectsForRewriting()) {
+            RelOptMaterialization existingMV = getRewritingMaterializedView(mvTable.getDbName(), mvTable.getTableName());
+            if (existingMV != null) {
+              // We replace if the existing MV is not newer
+              Table existingMVTable = extractTable(existingMV);
+              if (existingMVTable.getCreateTime() < mvTable.getCreateTime() ||
+                  (existingMVTable.getCreateTime() == mvTable.getCreateTime() &&
+                      existingMVTable.getCreationMetadata().getMaterializationTime() <= mvTable.getCreationMetadata().getMaterializationTime())) {
+                refreshMaterializedView(db.getConf(), existingMVTable, mvTable);
+              }
+            } else {
+              // Simply replace if it still does not exist
+              refreshMaterializedView(db.getConf(), null, mvTable);
+            }
+          }
+          LOG.info("Materialized views registry has been refreshed");
+        } else {
+          for (Table mvTable : db.getAllMaterializedViewObjectsForRewriting()) {
+            refreshMaterializedView(db.getConf(), null, mvTable);
+          }
+          initialized.set(true);
+          LOG.info("Materialized views registry has been initialized");
         }
-        initialized.set(true);
-        LOG.info("Materialized views registry has been initialized");
       } catch (HiveException e) {
-        LOG.error("Problem connecting to the metastore when initializing the view registry", e);
+        if (initialized.get()) {
+          LOG.error("Problem connecting to the metastore when refreshing the view registry", e);
+        } else {
+          LOG.error("Problem connecting to the metastore when initializing the view registry", e);
+        }
       }
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.MATERIALIZED_VIEWS_REGISTRY_REFRESH);
     }
   }
 
@@ -177,43 +209,9 @@ public final class HiveMaterializedViewsRegistry {
   }
 
   /**
-   * Adds a newly created materialized view to the cache.
-   *
-   * @param materializedViewTable the materialized view
+   * Parses and creates a materialization.
    */
-  public RelOptMaterialization createMaterializedView(HiveConf conf, Table materializedViewTable) {
-    final boolean cache = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname)
-        .equals("DUMMY");
-    return addMaterializedView(conf, materializedViewTable, OpType.CREATE, cache);
-  }
-
-  /**
-   * Adds the materialized view to the cache.
-   *
-   * @param materializedViewTable the materialized view
-   */
-  private RelOptMaterialization addMaterializedView(HiveConf conf, Table materializedViewTable,
-                                                    OpType opType, boolean cache) {
-    // Bail out if it is not enabled for rewriting
-    if (!materializedViewTable.isRewriteEnabled()) {
-      LOG.debug("Materialized view " + materializedViewTable.getCompleteName() +
-          " ignored; it is not rewrite enabled");
-      return null;
-    }
-
-    // We are going to create the map for each view in the given database
-    ConcurrentMap<String, RelOptMaterialization> cq =
-        new ConcurrentHashMap<String, RelOptMaterialization>();
-    if (cache) {
-      // If we are caching the MV, we include it in the cache
-      final ConcurrentMap<String, RelOptMaterialization> prevCq = materializedViews.putIfAbsent(
-          materializedViewTable.getDbName(), cq);
-      if (prevCq != null) {
-        cq = prevCq;
-      }
-    }
-
-    // Start the process to add MV to the cache
+  public RelOptMaterialization createMaterialization(HiveConf conf, Table materializedViewTable) {
     // First we parse the view query and create the materialization object
     final String viewQuery = materializedViewTable.getViewExpandedText();
     final RelNode viewScan = createMaterializedViewScan(conf, materializedViewTable);
@@ -231,36 +229,126 @@ public final class HiveMaterializedViewsRegistry {
       return null;
     }
 
-    RelOptMaterialization materialization = new RelOptMaterialization(viewScan, queryRel,
+    return new RelOptMaterialization(viewScan, queryRel,
         null, viewScan.getTable().getQualifiedName());
-    if (opType == OpType.CREATE) {
-      // You store the materialized view
-      cq.put(materializedViewTable.getTableName(), materialization);
-    } else {
-      // For LOAD, you only add it if it does exist as you might be loading an outdated MV
-      cq.putIfAbsent(materializedViewTable.getTableName(), materialization);
+  }
+
+  /**
+   * Adds a newly created materialized view to the cache.
+   */
+  public void createMaterializedView(HiveConf conf, Table materializedViewTable) {
+    final boolean cache = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname)
+        .equals("DUMMY");
+    if (!cache) {
+      // Nothing to do, bail out
+      return;
+    }
+
+    // Bail out if it is not enabled for rewriting
+    if (!materializedViewTable.isRewriteEnabled()) {
+      LOG.debug("Materialized view " + materializedViewTable.getCompleteName() +
+          " ignored; it is not rewrite enabled");
+      return;
+    }
+
+    // We are going to create the map for each view in the given database
+    ConcurrentMap<String, RelOptMaterialization> dbMap =
+        new ConcurrentHashMap<String, RelOptMaterialization>();
+    // If we are caching the MV, we include it in the cache
+    final ConcurrentMap<String, RelOptMaterialization> prevDbMap = materializedViews.putIfAbsent(
+        materializedViewTable.getDbName(), dbMap);
+    if (prevDbMap != null) {
+      dbMap = prevDbMap;
     }
 
+    RelOptMaterialization materialization = createMaterialization(conf, materializedViewTable);
+    if (materialization == null) {
+      return;
+    }
+    // You store the materialized view
+    dbMap.put(materializedViewTable.getTableName(), materialization);
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Created materialized view for rewriting: " + viewScan.getTable().getQualifiedName());
+      LOG.debug("Created materialized view for rewriting: " + materializedViewTable.getFullyQualifiedName());
     }
-    return materialization;
   }
 
   /**
-   * Removes the materialized view from the cache.
-   *
-   * @param materializedViewTable the materialized view to remove
+   * Update the materialized view in the registry (if existing materialized view matches).
+   */
+  public void refreshMaterializedView(HiveConf conf, Table oldMaterializedViewTable, Table materializedViewTable) {
+    final boolean cache = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname)
+        .equals("DUMMY");
+    if (!cache) {
+      // Nothing to do, bail out
+      return;
+    }
+
+    // Bail out if it is not enabled for rewriting
+    if (!materializedViewTable.isRewriteEnabled()) {
+      dropMaterializedView(oldMaterializedViewTable);
+      LOG.debug("Materialized view " + materializedViewTable.getCompleteName() +
+          " dropped; it is not rewrite enabled");
+      return;
+    }
+
+    // We are going to create the map for each view in the given database
+    ConcurrentMap<String, RelOptMaterialization> dbMap =
+        new ConcurrentHashMap<String, RelOptMaterialization>();
+    // If we are caching the MV, we include it in the cache
+    final ConcurrentMap<String, RelOptMaterialization> prevDbMap = materializedViews.putIfAbsent(
+        materializedViewTable.getDbName(), dbMap);
+    if (prevDbMap != null) {
+      dbMap = prevDbMap;
+    }
+    final RelOptMaterialization newMaterialization = createMaterialization(conf, materializedViewTable);
+    if (newMaterialization == null) {
+      return;
+    }
+    dbMap.compute(materializedViewTable.getTableName(), new BiFunction<String, RelOptMaterialization, RelOptMaterialization>() {
+      @Override
+      public RelOptMaterialization apply(String tableName, RelOptMaterialization existingMaterialization) {
+        if (existingMaterialization == null) {
+          // If it was not existing, we just create it
+          return newMaterialization;
+        }
+        Table existingMaterializedViewTable = extractTable(existingMaterialization);
+        if (existingMaterializedViewTable.equals(oldMaterializedViewTable)) {
+          // If the old version is the same, we replace it
+          return newMaterialization;
+        }
+        // Otherwise, we return existing materialization
+        return existingMaterialization;
+      }
+    });
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Materialized view refreshed: " + materializedViewTable.getFullyQualifiedName());
+    }
+  }
+
+  /**
+   * Removes the materialized view from the cache (based on table object equality), if exists.
    */
   public void dropMaterializedView(Table materializedViewTable) {
-    dropMaterializedView(materializedViewTable.getDbName(), materializedViewTable.getTableName());
+    ConcurrentMap<String, RelOptMaterialization> dbMap = materializedViews.get(materializedViewTable.getDbName());
+    if (dbMap != null) {
+      // Delete only if the create time for the input materialized view table and the table
+      // in the map match. Otherwise, keep the one in the map.
+      dbMap.computeIfPresent(materializedViewTable.getTableName(), new BiFunction<String, RelOptMaterialization, RelOptMaterialization>() {
+        @Override
+        public RelOptMaterialization apply(String tableName, RelOptMaterialization oldMaterialization) {
+          if (extractTable(oldMaterialization).equals(materializedViewTable)) {
+            return null;
+          }
+          return oldMaterialization;
+        }
+      });
+    }
   }
 
   /**
-   * Removes the materialized view from the cache.
-   *
-   * @param dbName the db for the materialized view to remove
-   * @param tableName the name for the materialized view to remove
+   * Removes the materialized view from the cache (based on qualified name), if exists.
    */
   public void dropMaterializedView(String dbName, String tableName) {
     ConcurrentMap<String, RelOptMaterialization> dbMap = materializedViews.get(dbName);
@@ -270,9 +358,19 @@ public final class HiveMaterializedViewsRegistry {
   }
 
   /**
+   * Returns all the materialized views in the cache.
+   *
+   * @return the collection of materialized views, or the empty collection if none
+   */
+  List<RelOptMaterialization> getRewritingMaterializedViews() {
+    List<RelOptMaterialization> result = new ArrayList<>();
+    materializedViews.forEach((dbName, mvs) -> result.addAll(mvs.values()));
+    return result;
+  }
+
+  /**
    * Returns the materialized views in the cache for the given database.
    *
-   * @param dbName the database
    * @return the collection of materialized views, or the empty collection if none
    */
   RelOptMaterialization getRewritingMaterializedView(String dbName, String viewName) {
@@ -422,6 +520,17 @@ public final class HiveMaterializedViewsRegistry {
     return TableType.NATIVE;
   }
 
+  private static Table extractTable(RelOptMaterialization materialization) {
+    RelOptHiveTable cachedMaterializedViewTable;
+    if (materialization.tableRel instanceof Project) {
+      // There is a Project on top (due to nullability)
+      cachedMaterializedViewTable = (RelOptHiveTable) materialization.tableRel.getInput(0).getTable();
+    } else {
+      cachedMaterializedViewTable = (RelOptHiveTable) materialization.tableRel.getTable();
+    }
+    return cachedMaterializedViewTable.getHiveTableMD();
+  }
+
   //@TODO this seems to be the same as org.apache.hadoop.hive.ql.parse.CalcitePlanner.TableType.DRUID do we really need both
   private enum TableType {
     DRUID,
@@ -429,9 +538,4 @@ public final class HiveMaterializedViewsRegistry {
     JDBC
   }
 
-  private enum OpType {
-    CREATE, //view just being created
-    LOAD // already created view being loaded
-  }
-
-}
\ No newline at end of file
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 60cd715..ef2ebac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -2247,7 +2247,10 @@ public class CalcitePlanner extends SemanticAnalyzer {
       final RelOptCluster optCluster = basePlan.getCluster();
       final PerfLogger perfLogger = SessionState.getPerfLogger();
 
+      final boolean useMaterializedViewsRegistry = !conf.get(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname)
+          .equals("DUMMY");
       final RelNode calcitePreMVRewritingPlan = basePlan;
+      final List<String> tablesUsedQuery = getTablesUsed(basePlan);
       final boolean mvRebuild = mvRebuildMode != MaterializationRebuildMode.NONE;
 
       // Add views to planner
@@ -2257,13 +2260,21 @@ public class CalcitePlanner extends SemanticAnalyzer {
           // We only retrieve the materialization corresponding to the rebuild. In turn,
           // we pass 'true' for the forceMVContentsUpToDate parameter, as we cannot allow the
           // materialization contents to be stale for a rebuild if we want to use it.
-          materializations = db.getValidMaterializedView(mvRebuildDbName, mvRebuildName,
-              getTablesUsed(basePlan), true, getTxnMgr());
+          RelOptMaterialization materialization = db.getMaterializedViewForRebuild(
+              mvRebuildDbName, mvRebuildName, tablesUsedQuery, getTxnMgr());
+          if (materialization != null) {
+            materializations.add(materialization);
+          }
         } else {
-          // This is not a rebuild, we retrieve all the materializations. In turn, we do not need
-          // to force the materialization contents to be up-to-date, as this is not a rebuild, and
-          // we apply the user parameters (HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW) instead.
-          materializations = db.getAllValidMaterializedViews(getTablesUsed(basePlan), false, getTxnMgr());
+          // This is not a rebuild, we retrieve all the materializations.
+          // In turn, we do not need to force the materialization contents to be up-to-date,
+          // as this is not a rebuild, and we apply the user parameters
+          // (HIVE_MATERIALIZED_VIEW_REWRITING_TIME_WINDOW) instead.
+          if (useMaterializedViewsRegistry) {
+            materializations = db.getPreprocessedMaterializedViewsFromRegistry(tablesUsedQuery, getTxnMgr());
+          } else {
+            materializations = db.getPreprocessedMaterializedViews(tablesUsedQuery, getTxnMgr());
+          }
         }
         // We need to use the current cluster for the scan operator on views,
         // otherwise the planner will throw an Exception (different planners)
@@ -2310,80 +2321,101 @@ public class CalcitePlanner extends SemanticAnalyzer {
       } catch (HiveException e) {
         LOG.warn("Exception loading materialized views", e);
       }
-      if (!materializations.isEmpty()) {
-        perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER);
 
-        if (mvRebuild) {
-          // If it is a materialized view rebuild, we use the HepPlanner, since we only have
-          // one MV and we would like to use it to create incremental maintenance plans
-          HepPlanner hepPlanner = createHepPlanner(basePlan.getCluster(), true, mdProvider, null,
-              HepMatchOrder.TOP_DOWN, HiveMaterializedViewRule.MATERIALIZED_VIEW_REWRITING_RULES);
-          // Add materialization for rebuild to planner
-          assert materializations.size() == 1;
-          hepPlanner.addMaterialization(materializations.get(0));
-          // Optimize plan
-          hepPlanner.setRoot(basePlan);
-          basePlan = hepPlanner.findBestExp();
-        } else {
-          // If this is not a rebuild, we use Volcano planner as the decision
-          // on whether to use MVs or not and which MVs to use should be cost-based
-          optCluster.invalidateMetadataQuery();
-          RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.DEFAULT);
-
-          // Add materializations to planner
-          for (RelOptMaterialization materialization : materializations) {
-            planner.addMaterialization(materialization);
-          }
-          // Add rule to split aggregate with grouping sets (if any)
-          planner.addRule(HiveAggregateSplitRule.INSTANCE);
-          // Add view-based rewriting rules to planner
-          for (RelOptRule rule : HiveMaterializedViewRule.MATERIALIZED_VIEW_REWRITING_RULES) {
-            planner.addRule(rule);
-          }
-          // Partition pruner rule
-          planner.addRule(HiveFilterProjectTSTransposeRule.INSTANCE);
-          planner.addRule(new HivePartitionPruneRule(conf));
-
-          // Optimize plan
-          planner.setRoot(basePlan);
-          basePlan = planner.findBestExp();
-          // Remove view-based rewriting rules from planner
-          planner.clear();
-
-          // Restore default cost model
-          optCluster.invalidateMetadataQuery();
-          RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(mdProvider));
+      if (materializations.isEmpty()) {
+        // There are no materializations, we can return the original plan
+        return calcitePreMVRewritingPlan;
+      }
+
+      perfLogger.PerfLogBegin(this.getClass().getName(), PerfLogger.OPTIMIZER);
+
+      if (mvRebuild) {
+        // If it is a materialized view rebuild, we use the HepPlanner, since we only have
+        // one MV and we would like to use it to create incremental maintenance plans
+        HepPlanner hepPlanner = createHepPlanner(basePlan.getCluster(), true, mdProvider, null,
+            HepMatchOrder.TOP_DOWN, HiveMaterializedViewRule.MATERIALIZED_VIEW_REWRITING_RULES);
+        // Add materialization for rebuild to planner
+        assert materializations.size() == 1;
+        hepPlanner.addMaterialization(materializations.get(0));
+        // Optimize plan
+        hepPlanner.setRoot(basePlan);
+        basePlan = hepPlanner.findBestExp();
+      } else {
+        // If this is not a rebuild, we use Volcano planner as the decision
+        // on whether to use MVs or not and which MVs to use should be cost-based
+        optCluster.invalidateMetadataQuery();
+        RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.DEFAULT);
+
+        // Add materializations to planner
+        for (RelOptMaterialization materialization : materializations) {
+          planner.addMaterialization(materialization);
         }
+        // Add rule to split aggregate with grouping sets (if any)
+        planner.addRule(HiveAggregateSplitRule.INSTANCE);
+        // Add view-based rewriting rules to planner
+        for (RelOptRule rule : HiveMaterializedViewRule.MATERIALIZED_VIEW_REWRITING_RULES) {
+          planner.addRule(rule);
+        }
+        // Partition pruner rule
+        planner.addRule(HiveFilterProjectTSTransposeRule.INSTANCE);
+        planner.addRule(new HivePartitionPruneRule(conf));
 
-        perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: View-based rewriting");
-
-        if (!RelOptUtil.toString(calcitePreMVRewritingPlan).equals(RelOptUtil.toString(basePlan))) {
-          // A rewriting was produced, we will check whether it was part of an incremental rebuild
-          // to try to replace INSERT OVERWRITE by INSERT or MERGE
-          if (mvRebuildMode == MaterializationRebuildMode.INSERT_OVERWRITE_REBUILD &&
-              HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL)) {
-            // First we need to check if it is valid to convert to MERGE/INSERT INTO.
-            // If we succeed, we modify the plan and afterwards the AST.
-            // MV should be an acid table.
-            MaterializedViewRewritingRelVisitor visitor = new MaterializedViewRewritingRelVisitor();
-            visitor.go(basePlan);
-            if (visitor.isRewritingAllowed()) {
-              // Trigger rewriting to remove UNION branch with MV
-              if (visitor.isContainsAggregate()) {
-                basePlan = hepPlan(basePlan, false, mdProvider, null,
-                    HepMatchOrder.TOP_DOWN, HiveAggregateIncrementalRewritingRule.INSTANCE);
-                mvRebuildMode = MaterializationRebuildMode.AGGREGATE_REBUILD;
-              } else {
-                basePlan = hepPlan(basePlan, false, mdProvider, null,
-                    HepMatchOrder.TOP_DOWN, HiveNoAggregateIncrementalRewritingRule.INSTANCE);
-                mvRebuildMode = MaterializationRebuildMode.NO_AGGREGATE_REBUILD;
-              }
+        // Optimize plan
+        planner.setRoot(basePlan);
+        basePlan = planner.findBestExp();
+        // Remove view-based rewriting rules from planner
+        planner.clear();
+
+        // Restore default cost model
+        optCluster.invalidateMetadataQuery();
+        RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(mdProvider));
+      }
+
+      perfLogger.PerfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: View-based rewriting");
+
+      List<Table> materializedViewsUsedOriginalPlan = getMaterializedViewsUsed(calcitePreMVRewritingPlan);
+      List<Table> materializedViewsUsedAfterRewrite = getMaterializedViewsUsed(basePlan);
+      if (materializedViewsUsedOriginalPlan.size() == materializedViewsUsedAfterRewrite.size()) {
+        // Materialized view-based rewriting did not happen, we can return the original plan
+        return calcitePreMVRewritingPlan;
+      }
+
+      // A rewriting was produced, we will check whether it was part of an incremental rebuild
+      // to try to replace INSERT OVERWRITE by INSERT or MERGE
+      if (mvRebuildMode == MaterializationRebuildMode.INSERT_OVERWRITE_REBUILD) {
+        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL)) {
+          // First we need to check if it is valid to convert to MERGE/INSERT INTO.
+          // If we succeed, we modify the plan and afterwards the AST.
+          // MV should be an acid table.
+          MaterializedViewRewritingRelVisitor visitor = new MaterializedViewRewritingRelVisitor();
+          visitor.go(basePlan);
+          if (visitor.isRewritingAllowed()) {
+            // Trigger rewriting to remove UNION branch with MV
+            if (visitor.isContainsAggregate()) {
+              basePlan = hepPlan(basePlan, false, mdProvider, null,
+                  HepMatchOrder.TOP_DOWN, HiveAggregateIncrementalRewritingRule.INSTANCE);
+              mvRebuildMode = MaterializationRebuildMode.AGGREGATE_REBUILD;
+            } else {
+              basePlan = hepPlan(basePlan, false, mdProvider, null,
+                  HepMatchOrder.TOP_DOWN, HiveNoAggregateIncrementalRewritingRule.INSTANCE);
+              mvRebuildMode = MaterializationRebuildMode.NO_AGGREGATE_REBUILD;
             }
           }
-          // Now we trigger some needed optimization rules again
-          basePlan = applyPreJoinOrderingTransforms(basePlan, mdProvider, executorProvider);
+        }
+      } else if (useMaterializedViewsRegistry) {
+        // Before proceeding we need to check whether materialized views used are up-to-date
+        // wrt information in metastore
+        try {
+          if (!db.validateMaterializedViewsFromRegistry(materializedViewsUsedAfterRewrite, tablesUsedQuery, getTxnMgr())) {
+            return calcitePreMVRewritingPlan;
+          }
+        } catch (HiveException e) {
+          LOG.warn("Exception validating materialized views", e);
+          return calcitePreMVRewritingPlan;
         }
       }
+      // Now we trigger some needed optimization rules again
+      basePlan = applyPreJoinOrderingTransforms(basePlan, mdProvider, executorProvider);
 
       if (mvRebuildMode == MaterializationRebuildMode.AGGREGATE_REBUILD) {
         // Make a cost-based decision factoring the configuration property
@@ -2392,7 +2424,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
         RelMetadataQuery mq = RelMetadataQuery.instance();
         RelOptCost costOriginalPlan = mq.getCumulativeCost(calcitePreMVRewritingPlan);
         final double factorSelectivity = (double) HiveConf.getFloatVar(
-          conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL_FACTOR);
+            conf, HiveConf.ConfVars.HIVE_MATERIALIZED_VIEW_REBUILD_INCREMENTAL_FACTOR);
         RelOptCost costRebuildPlan = mq.getCumulativeCost(basePlan).multiplyBy(factorSelectivity);
         if (costOriginalPlan.isLe(costRebuildPlan)) {
           basePlan = calcitePreMVRewritingPlan;
@@ -2420,6 +2452,30 @@ public class CalcitePlanner extends SemanticAnalyzer {
       return tablesUsed;
     }
 
+    private List<Table> getMaterializedViewsUsed(RelNode plan) {
+      List<Table> materializedViewsUsed = new ArrayList<>();
+      new RelVisitor() {
+        @Override
+        public void visit(RelNode node, int ordinal, RelNode parent) {
+          if (node instanceof TableScan) {
+            TableScan ts = (TableScan) node;
+            Table table = ((RelOptHiveTable) ts.getTable()).getHiveTableMD();
+            if (table.isMaterializedView()) {
+              materializedViewsUsed.add(table);
+            }
+          } else if (node instanceof DruidQuery) {
+            DruidQuery dq = (DruidQuery) node;
+            Table table = ((RelOptHiveTable) dq.getTable()).getHiveTableMD();
+            if (table.isMaterializedView()) {
+              materializedViewsUsed.add(table);
+            }
+          }
+          super.visit(node, ordinal, parent);
+        }
+      }.go(plan);
+      return materializedViewsUsed;
+    }
+
     /**
      * Run the HEP Planner with the given rule set.
      *
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 1865d77..7b0e5e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -115,6 +115,7 @@ import org.apache.hadoop.hive.ql.ddl.table.storage.AlterTableUnarchiveDesc;
 import org.apache.hadoop.hive.ql.ddl.view.AlterMaterializedViewRewriteDesc;
 import org.apache.hadoop.hive.ql.ddl.view.DropMaterializedViewDesc;
 import org.apache.hadoop.hive.ql.ddl.view.DropViewDesc;
+import org.apache.hadoop.hive.ql.ddl.view.MaterializedViewUpdateDesc;
 import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
 import org.apache.hadoop.hive.ql.exec.ColumnStatsUpdateTask;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
@@ -3274,7 +3275,22 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer {
 
     inputs.add(new ReadEntity(materializedViewTable));
     outputs.add(new WriteEntity(materializedViewTable, WriteEntity.WriteType.DDL_EXCLUSIVE));
-    rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterMVRewriteDesc)));
+
+    // Create task for alterMVRewriteDesc
+    DDLWork work = new DDLWork(getInputs(), getOutputs(), alterMVRewriteDesc);
+    Task<?> targetTask = TaskFactory.get(work);
+
+    // Create task to update rewrite flag as dependant of previous one
+    String tableName = alterMVRewriteDesc.getMaterializedViewName();
+    boolean retrieveAndInclude = alterMVRewriteDesc.isRewriteEnable();
+    boolean disableRewrite = !alterMVRewriteDesc.isRewriteEnable();
+    MaterializedViewUpdateDesc materializedViewUpdateDesc =
+        new MaterializedViewUpdateDesc(tableName, retrieveAndInclude, disableRewrite, false);
+    DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), materializedViewUpdateDesc);
+    targetTask.addDependentTask(TaskFactory.get(ddlWork, conf));
+
+    // Add root task
+    rootTasks.add(targetTask);
   }
 
 }
diff --git a/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q b/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q
index a3f437e..f617eaf 100644
--- a/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q
+++ b/ql/src/test/queries/clientpositive/materialized_view_partitioned_2.q
@@ -32,8 +32,8 @@ SELECT * FROM src_txn_2 where value > 'val_220' and value < 'val_230';
 
 SELECT * FROM src_txn_2 where value > 'val_220' and value < 'val_230';
 
--- SHOULD CHOOSE partition_mv_4 SINCE IT IS THE MOST EFFICIENT
--- READING ONLY ONE PARTITION
+-- SHOULD CHOOSE partition_mv_1, partition_mv_3 OR partition_mv_4
+-- SINCE IT IS THE MOST EFFICIENT READING ONLY ONE PARTITION
 EXPLAIN
 SELECT * FROM src_txn_2 where key > 224 and key < 226;
 
diff --git a/ql/src/test/results/clientnegative/strict_pruning_2.q.out b/ql/src/test/results/clientnegative/strict_pruning_2.q.out
index b50d411..85c16b9 100644
--- a/ql/src/test/results/clientnegative/strict_pruning_2.q.out
+++ b/ql/src/test/results/clientnegative/strict_pruning_2.q.out
@@ -1 +1 @@
-FAILED: SemanticException [Error 10056]: Queries against partitioned tables without a partition filter are disabled for safety reasons. If you know what you are doing, please set hive.strict.checks.no.partition.filter to false and make sure that hive.mapred.mode is not set to 'strict' to proceed. Note that you may get errors or incorrect results if you make a mistake while using some of the unsafe features. No partition predicate for Alias "srcpart" Table "srcpart"
+FAILED: SemanticException [Error 10056]: Queries against partitioned tables without a partition filter are disabled for safety reasons. If you know what you are doing, please set hive.strict.checks.no.partition.filter to false and make sure that hive.mapred.mode is not set to 'strict' to proceed. Note that you may get errors or incorrect results if you make a mistake while using some of the unsafe features. No partition predicate for Alias "default.srcpart" Table "srcpart"
diff --git a/ql/src/test/results/clientpositive/beeline/materialized_view_create_rewrite.q.out b/ql/src/test/results/clientpositive/beeline/materialized_view_create_rewrite.q.out
index 3fd6b20..6c77549 100644
--- a/ql/src/test/results/clientpositive/beeline/materialized_view_create_rewrite.q.out
+++ b/ql/src/test/results/clientpositive/beeline/materialized_view_create_rewrite.q.out
@@ -260,6 +260,7 @@ POSTHOOK: Input: default@cmv_mat_view2_n4
 POSTHOOK: Output: default@cmv_mat_view2_n4
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
+  Stage-1 depends on stages: Stage-0
 
 STAGE PLANS:
   Stage: Stage-0
@@ -267,6 +268,11 @@ STAGE PLANS:
       name: default.cmv_mat_view2_n4
       enable: true
 
+  Stage: Stage-1
+    Materialized View Update
+      name: default.cmv_mat_view2_n4
+      retrieveAndInclude: true
+
 PREHOOK: query: alter materialized view cmv_mat_view2_n4 enable rewrite
 PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE
 PREHOOK: Input: default@cmv_mat_view2_n4
diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out
index 53f7233..93a1103 100644
--- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out
+++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite.q.out
@@ -272,6 +272,7 @@ POSTHOOK: Input: default@cmv_mat_view2_n4
 POSTHOOK: Output: default@cmv_mat_view2_n4
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
+  Stage-1 depends on stages: Stage-0
 
 STAGE PLANS:
   Stage: Stage-0
@@ -279,6 +280,11 @@ STAGE PLANS:
       name: default.cmv_mat_view2_n4
       enable: true
 
+  Stage: Stage-1
+    Materialized View Update
+      name: default.cmv_mat_view2_n4
+      retrieveAndInclude: true
+
 PREHOOK: query: alter materialized view cmv_mat_view2_n4 enable rewrite
 PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE
 PREHOOK: Input: default@cmv_mat_view2_n4
diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
index 81d69b1..cf93cff 100644
--- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
+++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_4.q.out
@@ -467,6 +467,7 @@ POSTHOOK: Input: default@cmv_mat_view_n5
 POSTHOOK: Output: default@cmv_mat_view_n5
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
+  Stage-1 depends on stages: Stage-0
 
 STAGE PLANS:
   Stage: Stage-0
@@ -474,6 +475,11 @@ STAGE PLANS:
       name: default.cmv_mat_view_n5
       enable: true
 
+  Stage: Stage-1
+    Materialized View Update
+      name: default.cmv_mat_view_n5
+      retrieveAndInclude: true
+
 PREHOOK: query: ALTER MATERIALIZED VIEW cmv_mat_view_n5 ENABLE REWRITE
 PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE
 PREHOOK: Input: default@cmv_mat_view_n5
diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out
index 25c89a5..62b74da 100644
--- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out
+++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_dummy.q.out
@@ -147,6 +147,7 @@ POSTHOOK: Input: default@cmv_mat_view2
 POSTHOOK: Output: default@cmv_mat_view2
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
+  Stage-1 depends on stages: Stage-0
 
 STAGE PLANS:
   Stage: Stage-0
@@ -154,6 +155,11 @@ STAGE PLANS:
       name: default.cmv_mat_view2
       disable: true
 
+  Stage: Stage-1
+    Materialized View Update
+      name: default.cmv_mat_view2
+      disableRewrite: true
+
 PREHOOK: query: alter materialized view cmv_mat_view2 disable rewrite
 PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE
 PREHOOK: Input: default@cmv_mat_view2
@@ -291,6 +297,7 @@ POSTHOOK: Input: default@cmv_mat_view2
 POSTHOOK: Output: default@cmv_mat_view2
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
+  Stage-1 depends on stages: Stage-0
 
 STAGE PLANS:
   Stage: Stage-0
@@ -298,6 +305,11 @@ STAGE PLANS:
       name: default.cmv_mat_view2
       enable: true
 
+  Stage: Stage-1
+    Materialized View Update
+      name: default.cmv_mat_view2
+      retrieveAndInclude: true
+
 PREHOOK: query: alter materialized view cmv_mat_view2 enable rewrite
 PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE
 PREHOOK: Input: default@cmv_mat_view2
diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out
index 7da22c0..5f7c4f6 100644
--- a/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out
+++ b/ql/src/test/results/clientpositive/llap/materialized_view_create_rewrite_time_window.q.out
@@ -467,6 +467,7 @@ POSTHOOK: Input: default@cmv_mat_view_n3
 POSTHOOK: Output: default@cmv_mat_view_n3
 STAGE DEPENDENCIES:
   Stage-0 is a root stage
+  Stage-1 depends on stages: Stage-0
 
 STAGE PLANS:
   Stage: Stage-0
@@ -474,6 +475,11 @@ STAGE PLANS:
       name: default.cmv_mat_view_n3
       enable: true
 
+  Stage: Stage-1
+    Materialized View Update
+      name: default.cmv_mat_view_n3
+      retrieveAndInclude: true
+
 PREHOOK: query: ALTER MATERIALIZED VIEW cmv_mat_view_n3 ENABLE REWRITE
 PREHOOK: type: ALTER_MATERIALIZED_VIEW_REWRITE
 PREHOOK: Input: default@cmv_mat_view_n3
diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out
index d9c76fb..6726c1e 100644
--- a/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out
+++ b/ql/src/test/results/clientpositive/llap/materialized_view_partitioned_2.q.out
@@ -525,13 +525,13 @@ POSTHOOK: Input: default@src_txn_2
 PREHOOK: query: EXPLAIN
 SELECT * FROM src_txn_2 where key > 224 and key < 226
 PREHOOK: type: QUERY
-PREHOOK: Input: default@partition_mv_4
+PREHOOK: Input: default@partition_mv_1
 PREHOOK: Input: default@src_txn_2
 #### A masked pattern was here ####
 POSTHOOK: query: EXPLAIN
 SELECT * FROM src_txn_2 where key > 224 and key < 226
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@partition_mv_4
+POSTHOOK: Input: default@partition_mv_1
 POSTHOOK: Input: default@src_txn_2
 #### A masked pattern was here ####
 STAGE DEPENDENCIES:
@@ -543,7 +543,7 @@ STAGE PLANS:
       limit: -1
       Processor Tree:
         TableScan
-          alias: default.partition_mv_4
+          alias: default.partition_mv_1
           filterExpr: ((UDFToDouble(key) > 224.0D) and (UDFToDouble(key) < 226.0D)) (type: boolean)
           Filter Operator
             predicate: ((UDFToDouble(key) > 224.0D) and (UDFToDouble(key) < 226.0D)) (type: boolean)
@@ -554,14 +554,14 @@ STAGE PLANS:
 
 PREHOOK: query: SELECT * FROM src_txn_2 where key > 223 and key < 225
 PREHOOK: type: QUERY
-PREHOOK: Input: default@partition_mv_4
-PREHOOK: Input: default@partition_mv_4@key=224
+PREHOOK: Input: default@partition_mv_1
+PREHOOK: Input: default@partition_mv_1@key=224
 PREHOOK: Input: default@src_txn_2
 #### A masked pattern was here ####
 POSTHOOK: query: SELECT * FROM src_txn_2 where key > 223 and key < 225
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@partition_mv_4
-POSTHOOK: Input: default@partition_mv_4@key=224
+POSTHOOK: Input: default@partition_mv_1
+POSTHOOK: Input: default@partition_mv_1@key=224
 POSTHOOK: Input: default@src_txn_2
 #### A masked pattern was here ####
 224	val_224
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 6029d11..504e6b1 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -38,6 +38,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.SortedSet;
@@ -2002,14 +2003,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       return null;
     }
 
-    // Parse validTxnList
-    final ValidReadTxnList validTxnList =
-        new ValidReadTxnList(validTxnListStr);
-
-    // Parse validReaderWriteIdList from creation metadata
-    final ValidTxnWriteIdList validReaderWriteIdList =
-        new ValidTxnWriteIdList(creationMetadata.getValidTxnList());
-
     // We are composing a query that returns a single row if an update happened after
     // the materialization was created. Otherwise, query returns 0 rows.
     Connection dbConn = null;
@@ -2017,12 +2010,48 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     ResultSet rs = null;
     try {
       dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+
+      // Parse validReaderWriteIdList from creation metadata
+      final ValidTxnWriteIdList validReaderWriteIdList =
+          new ValidTxnWriteIdList(creationMetadata.getValidTxnList());
+
+      // Parse validTxnList
+      final ValidReadTxnList currentValidTxnList = new ValidReadTxnList(validTxnListStr);
+      // Get the valid write id list for the tables in current state
+      final List<TableValidWriteIds> currentTblValidWriteIdsList = new ArrayList<>();
+      for (String fullTableName : creationMetadata.getTablesUsed()) {
+        currentTblValidWriteIdsList.add(getValidWriteIdsForTable(dbConn, fullTableName, currentValidTxnList));
+      }
+      final ValidTxnWriteIdList currentValidReaderWriteIdList = TxnCommonUtils.createValidTxnWriteIdList(
+          currentValidTxnList.getHighWatermark(), currentTblValidWriteIdsList);
+
       List<String> params = new ArrayList<>();
       StringBuilder query = new StringBuilder();
       // compose a query that select transactions containing an update...
       query.append("select ctc_update_delete from COMPLETED_TXN_COMPONENTS where ctc_update_delete='Y' AND (");
       int i = 0;
       for (String fullyQualifiedName : creationMetadata.getTablesUsed()) {
+        ValidWriteIdList tblValidWriteIdList =
+            validReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName);
+        if (tblValidWriteIdList == null) {
+          LOG.warn("ValidWriteIdList for table {} not present in creation metadata, this should not happen", fullyQualifiedName);
+          return null;
+        }
+
+        // First, we check whether the low watermark has moved for any of the tables.
+        // If it has, we return true, since it is not incrementally refreshable, e.g.,
+        // one of the commits that are not available may be an update/delete.
+        ValidWriteIdList currentTblValidWriteIdList =
+            currentValidReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName);
+        if (currentTblValidWriteIdList == null) {
+          LOG.warn("Current ValidWriteIdList for table {} not present in creation metadata, this should not happen", fullyQualifiedName);
+          return null;
+        }
+        if (!Objects.equals(currentTblValidWriteIdList.getMinOpenWriteId(), tblValidWriteIdList.getMinOpenWriteId())) {
+          LOG.debug("Minimum open write id do not match for table {}", fullyQualifiedName);
+          return null;
+        }
+
         // ...for each of the tables that are part of the materialized view,
         // where the transaction had to be committed after the materialization was created...
         if (i != 0) {
@@ -2033,12 +2062,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         query.append(" (ctc_database=? AND ctc_table=?");
         params.add(names[0]);
         params.add(names[1]);
-        ValidWriteIdList tblValidWriteIdList =
-            validReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName);
-        if (tblValidWriteIdList == null) {
-          LOG.warn("ValidWriteIdList for table {} not present in creation metadata, this should not happen", fullyQualifiedName);
-          return null;
-        }
         query.append(" AND (ctc_writeid > " + tblValidWriteIdList.getHighWatermark());
         query.append(tblValidWriteIdList.getInvalidWriteIds().length == 0 ? ") " :
             " OR ctc_writeid IN(" + StringUtils.join(",",
@@ -2048,10 +2071,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       }
       // ... and where the transaction has already been committed as per snapshot taken
       // when we are running current query
-      query.append(") AND ctc_txnid <= " + validTxnList.getHighWatermark());
-      query.append(validTxnList.getInvalidTransactions().length == 0 ? " " :
+      query.append(") AND ctc_txnid <= " + currentValidTxnList.getHighWatermark());
+      query.append(currentValidTxnList.getInvalidTransactions().length == 0 ? " " :
           " AND ctc_txnid NOT IN(" + StringUtils.join(",",
-              Arrays.asList(ArrayUtils.toObject(validTxnList.getInvalidTransactions()))) + ") ");
+              Arrays.asList(ArrayUtils.toObject(currentValidTxnList.getInvalidTransactions()))) + ") ");
 
       // Execute query
       String s = query.toString();