You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2023/03/14 11:31:09 UTC

[hive] branch master updated: HIVE-27101: Support incremental materialized view rebuild when Iceberg source tables have insert operation only. (Krisztian Kasa, reviewed by Denys Kuzmenko, Aman Sinha)

This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c5d9b90d96 HIVE-27101: Support incremental materialized view rebuild when Iceberg source tables have insert operation only. (Krisztian Kasa, reviewed by Denys Kuzmenko, Aman Sinha)
1c5d9b90d96 is described below

commit 1c5d9b90d960af8b84a1278e2faf8073a352c191
Author: Krisztian Kasa <ka...@gmail.com>
AuthorDate: Tue Mar 14 12:30:50 2023 +0100

    HIVE-27101: Support incremental materialized view rebuild when Iceberg source tables have insert operation only. (Krisztian Kasa, reviewed by Denys Kuzmenko, Aman Sinha)
---
 .../org/apache/iceberg/mr/InputFormatConfig.java   |   1 +
 .../iceberg/mr/hive/HiveIcebergInputFormat.java    |   1 +
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  24 ++
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   |  35 +-
 .../src/test/queries/positive/mv_iceberg_orc2.q    |   1 +
 .../src/test/queries/positive/mv_iceberg_orc4.q    |  30 ++
 .../src/test/queries/positive/mv_iceberg_orc5.q    |  30 ++
 .../src/test/queries/positive/mv_iceberg_orc6.q    |  30 ++
 .../test/results/positive/mv_iceberg_orc2.q.out    |   2 +-
 .../test/results/positive/mv_iceberg_orc4.q.out    | 130 +++++++
 .../test/results/positive/mv_iceberg_orc5.q.out    | 135 +++++++
 .../test/results/positive/mv_iceberg_orc6.q.out    | 111 ++++++
 .../java/org/apache/iceberg/SerializableTable.java | 420 +++++++++++++++++++++
 .../org/apache/hadoop/hive/ql/parse/HiveParser.g   |   1 +
 .../AlterMaterializedViewRebuildAnalyzer.java      |   5 +
 .../apache/hadoop/hive/ql/io/HiveInputFormat.java  |   4 +
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   |  91 +++--
 .../hive/ql/metadata/HiveStorageHandler.java       |   4 +
 .../org/apache/hadoop/hive/ql/metadata/Table.java  |  15 +-
 .../hadoop/hive/ql/metadata/VirtualColumn.java     |   1 +
 .../calcite/reloperators/HiveTableScan.java        |   6 +-
 .../HiveAugmentSnapshotMaterializationRule.java    | 152 ++++++++
 .../rules/views/HiveMaterializedViewUtils.java     |  48 ++-
 .../views/HivePushdownSnapshotFilterRule.java      | 158 ++++++++
 .../optimizer/calcite/translator/ASTBuilder.java   |   6 +
 .../hadoop/hive/ql/parse/BaseSemanticAnalyzer.java |   6 +-
 .../hadoop/hive/ql/parse/CalcitePlanner.java       |   5 +
 .../java/org/apache/hadoop/hive/ql/parse/QB.java   |  13 +-
 .../hadoop/hive/ql/parse/QBSystemVersion.java      |  43 +++
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     |  15 +-
 .../apache/hadoop/hive/ql/plan/TableScanDesc.java  |  13 +-
 .../hadoop/hive/common/type/SnapshotContext.java   |   7 +
 32 files changed, 1481 insertions(+), 62 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index 2b64c79160c..ff1f6bb0a38 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -42,6 +42,7 @@ public class InputFormatConfig {
   public static final String IN_MEMORY_DATA_MODEL = "iceberg.mr.in.memory.data.model";
   public static final String READ_SCHEMA = "iceberg.mr.read.schema";
   public static final String SNAPSHOT_ID = "iceberg.mr.snapshot.id";
+  public static final String SNAPSHOT_ID_INTERVAL_FROM = "iceberg.mr.snapshot.id.interval.from";
   public static final String SPLIT_SIZE = "iceberg.mr.split.size";
   public static final String SCHEMA_AUTO_CONVERSION = "iceberg.mr.schema.auto.conversion";
   public static final String TABLE_IDENTIFIER = "iceberg.mr.table.identifier";
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 7a691905a2a..57c60f28cf2 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -142,6 +142,7 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
             job.getBoolean(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR, false));
     job.set(InputFormatConfig.AS_OF_TIMESTAMP, job.get(TableScanDesc.AS_OF_TIMESTAMP, "-1"));
     job.set(InputFormatConfig.SNAPSHOT_ID, job.get(TableScanDesc.AS_OF_VERSION, "-1"));
+    job.set(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, job.get(TableScanDesc.FROM_VERSION, "-1"));
 
     String location = job.get(InputFormatConfig.TABLE_LOCATION);
     return Arrays.stream(super.getSplits(job, numSplits))
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index d3ccd7b84ee..b9ff017ac66 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -1179,4 +1179,28 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
     }
     return COPY_ON_WRITE.equalsIgnoreCase(mode);
   }
+
+  @Override
+  public Boolean hasAppendsOnly(org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) {
+    TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
+    Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
+    boolean foundSince = false;
+    for (Snapshot snapshot : table.snapshots()) {
+      if (!foundSince) {
+        if (snapshot.snapshotId() == since.getSnapshotId()) {
+          foundSince = true;
+        }
+      } else {
+        if (!"append".equals(snapshot.operation())) {
+          return false;
+        }
+      }
+    }
+
+    if (foundSince) {
+      return true;
+    }
+
+    return null;
+  }
 }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index ae3b31563e8..d01e01ab034 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -47,9 +47,11 @@ import org.apache.iceberg.DataTableScan;
 import org.apache.iceberg.DataTask;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IncrementalAppendScan;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Scan;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.SerializableTable;
@@ -109,8 +111,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
   }
 
   private static TableScan createTableScan(Table table, Configuration conf) {
-    TableScan scan = table.newScan()
-        .caseSensitive(conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT));
+    TableScan scan = table.newScan();
+
     long snapshotId = conf.getLong(InputFormatConfig.SNAPSHOT_ID, -1);
     if (snapshotId != -1) {
       scan = scan.useSnapshot(snapshotId);
@@ -121,6 +123,23 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       scan = scan.asOfTime(asOfTime);
     }
 
+    return scan;
+  }
+
+  private static IncrementalAppendScan createIncrementalAppendScan(Table table, Configuration conf) {
+    long fromSnapshot = conf.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1);
+    return table.newIncrementalAppendScan().fromSnapshotExclusive(fromSnapshot);
+  }
+
+  private static <
+          T extends Scan<T, FileScanTask, CombinedScanTask>> Scan<T,
+          FileScanTask,
+          CombinedScanTask> applyConfig(
+          Configuration conf, Scan<T, FileScanTask, CombinedScanTask> scanToConfigure) {
+
+    Scan<T, FileScanTask, CombinedScanTask> scan = scanToConfigure.caseSensitive(
+            conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT));
+
     long splitSize = conf.getLong(InputFormatConfig.SPLIT_SIZE, 0);
     if (splitSize > 0) {
       scan = scan.option(TableProperties.SPLIT_SIZE, String.valueOf(splitSize));
@@ -154,7 +173,6 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       // On the execution side residual expressions will be mined from the passed job conf.
       scan = scan.filter(filter).ignoreResiduals();
     }
-
     return scan;
   }
 
@@ -165,12 +183,19 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
         .ofNullable(HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
         .orElseGet(() -> Catalogs.loadTable(conf));
 
-    TableScan scan = createTableScan(table, conf);
-
     List<InputSplit> splits = Lists.newArrayList();
     boolean applyResidual = !conf.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
     InputFormatConfig.InMemoryDataModel model = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
         InputFormatConfig.InMemoryDataModel.GENERIC);
+
+    long fromVersion = conf.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1);
+    Scan<?, FileScanTask, CombinedScanTask> scan;
+    if (fromVersion != -1) {
+      scan = applyConfig(conf, createIncrementalAppendScan(table, conf));
+    } else {
+      scan = applyConfig(conf, createTableScan(table, conf));
+    }
+
     try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
       Table serializableTable = SerializableTable.copyOf(table);
       tasksIterable.forEach(task -> {
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc2.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc2.q
index 2400c9a6424..f0fd54ffbfe 100644
--- a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc2.q
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc2.q
@@ -1,5 +1,6 @@
 -- MV data is stored by iceberg v1
 -- SORT_QUERY_RESULTS
+--! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
 
 set hive.explain.user=false;
 set hive.support.concurrency=true;
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc4.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc4.q
new file mode 100644
index 00000000000..494ea821a71
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc4.q
@@ -0,0 +1,30 @@
+-- MV source tables are iceberg and MV has aggregate
+-- SORT_QUERY_RESULTS
+--! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
+
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+drop table if exists tbl_ice;
+
+create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1');
+create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2');
+
+insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54);
+insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);
+
+create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c;
+
+-- insert some new values to the source tables
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54);
+insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);
+
+explain cbo
+alter materialized view mat1 rebuild;
+alter materialized view mat1 rebuild;
+
+select * from mat1;
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc5.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc5.q
new file mode 100644
index 00000000000..dd55b918d5b
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc5.q
@@ -0,0 +1,30 @@
+-- MV source tables are iceberg and MV has aggregate. It also has avg which is calculated from sum and count.
+-- SORT_QUERY_RESULTS
+--! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
+
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+drop table if exists tbl_ice;
+
+create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1');
+create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2');
+
+insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54);
+insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);
+
+create materialized view mat2 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f), count(tbl_ice_v2.f), avg(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c;
+
+-- insert some new records to the source tables
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54);
+insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);
+
+explain cbo
+alter materialized view mat2 rebuild;
+alter materialized view mat2 rebuild;
+
+select * from mat2;
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q
new file mode 100644
index 00000000000..ff5a113bd0b
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q
@@ -0,0 +1,30 @@
+-- MV source tables are iceberg and has delete operation
+-- SORT_QUERY_RESULTS
+--! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
+
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+drop table if exists tbl_ice;
+
+create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1');
+create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2');
+
+insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54);
+insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);
+
+create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c;
+
+-- delete some records from a source table
+delete from tbl_ice_v2 where d = 4;
+
+-- plan should be insert overwrite
+explain cbo
+alter materialized view mat1 rebuild;
+alter materialized view mat1 rebuild;
+
+select * from mat1;
diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc2.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc2.q.out
index f97a61ac61a..a6fa7ea6276 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc2.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc2.q.out
@@ -237,7 +237,7 @@ POSTHOOK: Output: default@mat1
 CBO PLAN:
 HiveProject(b=[$1], c=[$2])
   HiveFilter(condition=[>($2, 52)])
-    HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice])
+    HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice], fromVersion=[#Masked#])
 
 PREHOOK: query: alter materialized view mat1 rebuild
 PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc4.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc4.q.out
new file mode 100644
index 00000000000..d3d66befb77
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc4.q.out
@@ -0,0 +1,130 @@
+PREHOOK: query: drop table if exists tbl_ice
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists tbl_ice
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+PREHOOK: type: CREATE_MATERIALIZED_VIEW
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: database:default
+PREHOOK: Output: default@mat1
+POSTHOOK: query: create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+POSTHOOK: type: CREATE_MATERIALIZED_VIEW
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@mat1
+POSTHOOK: Lineage: mat1._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat1.b SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat1.c SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: explain cbo
+alter materialized view mat1 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat1
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat1
+POSTHOOK: query: explain cbo
+alter materialized view mat1 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat1
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat1
+CBO PLAN:
+HiveAggregate(group=[{0, 1}], agg#0=[sum($2)])
+  HiveProject($f0=[$0], $f1=[$1], $f2=[$2])
+    HiveUnion(all=[true])
+      HiveProject(b=[$0], c=[$1], $f2=[$2])
+        HiveAggregate(group=[{1, 2}], agg#0=[sum($4)])
+          HiveJoin(condition=[=($0, $3)], joinType=[inner], algorithm=[none], cost=[not available])
+            HiveProject(a=[$0], b=[$1], c=[$2])
+              HiveFilter(condition=[AND(>($2, 52), IS NOT NULL($0))])
+                HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice], fromVersion=[#Masked#])
+            HiveProject(d=[$0], f=[$2])
+              HiveFilter(condition=[IS NOT NULL($0)])
+                HiveTableScan(table=[[default, tbl_ice_v2]], table:alias=[tbl_ice_v2], fromVersion=[#Masked#])
+      HiveProject(b=[$0], c=[$1], _c2=[$2])
+        HiveTableScan(table=[[default, mat1]], table:alias=[default.mat1])
+
+PREHOOK: query: alter materialized view mat1 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat1
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat1
+POSTHOOK: query: alter materialized view mat1 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat1
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat1
+POSTHOOK: Lineage: mat1._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), (mat1)default.mat1.FieldSchema(name:_c2, type:bigint, comment:null), ]
+POSTHOOK: Lineage: mat1.b EXPRESSION [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), (mat1)default.mat1.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat1.c EXPRESSION [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), (mat1)default.mat1.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: select * from mat1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@mat1
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from mat1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@mat1
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+five	54	108
+four	53	106
diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc5.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc5.q.out
new file mode 100644
index 00000000000..e24319ae71a
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc5.q.out
@@ -0,0 +1,135 @@
+PREHOOK: query: drop table if exists tbl_ice
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists tbl_ice
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: create materialized view mat2 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f), count(tbl_ice_v2.f), avg(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+PREHOOK: type: CREATE_MATERIALIZED_VIEW
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: database:default
+PREHOOK: Output: default@mat2
+POSTHOOK: query: create materialized view mat2 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f), count(tbl_ice_v2.f), avg(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+POSTHOOK: type: CREATE_MATERIALIZED_VIEW
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@mat2
+POSTHOOK: Lineage: mat2._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat2._c3 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat2._c4 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat2.b SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat2.c SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: explain cbo
+alter materialized view mat2 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat2
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat2
+POSTHOOK: query: explain cbo
+alter materialized view mat2 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat2
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat2
+CBO PLAN:
+HiveProject(b=[$0], c=[$1], _o__c2=[$2], _o__c3=[COALESCE($3, 0:BIGINT)], _o__c4=[/(CAST($2):DOUBLE, COALESCE($3, 0:BIGINT))])
+  HiveAggregate(group=[{0, 1}], agg#0=[sum($2)], agg#1=[sum($3)])
+    HiveProject($f0=[$0], $f1=[$1], $f2=[$2], $f3=[$3])
+      HiveUnion(all=[true])
+        HiveProject(b=[$0], c=[$1], $f2=[$2], $f3=[$3])
+          HiveAggregate(group=[{1, 2}], agg#0=[sum($4)], agg#1=[count($4)])
+            HiveJoin(condition=[=($0, $3)], joinType=[inner], algorithm=[none], cost=[not available])
+              HiveProject(a=[$0], b=[$1], c=[$2])
+                HiveFilter(condition=[AND(>($2, 52), IS NOT NULL($0))])
+                  HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice], fromVersion=[#Masked#])
+              HiveProject(d=[$0], f=[$2])
+                HiveFilter(condition=[IS NOT NULL($0)])
+                  HiveTableScan(table=[[default, tbl_ice_v2]], table:alias=[tbl_ice_v2], fromVersion=[#Masked#])
+        HiveProject(b=[$0], c=[$1], _c2=[$2], _c3=[$3])
+          HiveTableScan(table=[[default, mat2]], table:alias=[default.mat2])
+
+PREHOOK: query: alter materialized view mat2 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat2
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat2
+POSTHOOK: query: alter materialized view mat2 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat2
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat2
+POSTHOOK: Lineage: mat2._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), (mat2)default.mat2.FieldSchema(name:_c2, type:bigint, comment:null), ]
+POSTHOOK: Lineage: mat2._c3 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), (mat2)default.mat2.FieldSchema(name:_c3, type:bigint, comment:null), ]
+POSTHOOK: Lineage: mat2._c4 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), (mat2)default.mat2.FieldSchema(name:_c2, type:bigint, comment:null), (mat2)default.mat2.FieldSchema(name:_c3, type:bigint, comment:null), ]
+POSTHOOK: Lineage: mat2.b EXPRESSION [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), (mat2)default.mat2.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat2.c EXPRESSION [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), (mat2)default.mat2.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: select * from mat2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@mat2
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from mat2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@mat2
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+five	54	108	2	54.0
+four	53	106	2	53.0
diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc6.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc6.q.out
new file mode 100644
index 00000000000..8c38b62f3c9
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc6.q.out
@@ -0,0 +1,111 @@
+PREHOOK: query: drop table if exists tbl_ice
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists tbl_ice
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+PREHOOK: type: CREATE_MATERIALIZED_VIEW
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: database:default
+PREHOOK: Output: default@mat1
+POSTHOOK: query: create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+POSTHOOK: type: CREATE_MATERIALIZED_VIEW
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@mat1
+POSTHOOK: Lineage: mat1._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat1.b SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat1.c SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: delete from tbl_ice_v2 where d = 4
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: delete from tbl_ice_v2 where d = 4
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: explain cbo
+alter materialized view mat1 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat1
+POSTHOOK: query: explain cbo
+alter materialized view mat1 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat1
+CBO PLAN:
+HiveAggregate(group=[{1, 2}], agg#0=[sum($4)])
+  HiveJoin(condition=[=($0, $3)], joinType=[inner], algorithm=[none], cost=[not available])
+    HiveProject(a=[$0], b=[$1], c=[$2])
+      HiveFilter(condition=[AND(>($2, 52), IS NOT NULL($0))])
+        HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice])
+    HiveProject(d=[$0], f=[$2])
+      HiveFilter(condition=[IS NOT NULL($0)])
+        HiveTableScan(table=[[default, tbl_ice_v2]], table:alias=[tbl_ice_v2])
+
+PREHOOK: query: alter materialized view mat1 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat1
+POSTHOOK: query: alter materialized view mat1 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat1
+POSTHOOK: Lineage: mat1._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat1.b SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat1.c SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: select * from mat1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@mat1
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from mat1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@mat1
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+five	54	54
diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/SerializableTable.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/SerializableTable.java
new file mode 100644
index 00000000000..14f5deadb91
--- /dev/null
+++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.SerializableSupplier;
+
+/**
+ * A read-only serializable table that can be sent to other nodes in a cluster.
+ *
+ * <p>An instance of this class represents an immutable serializable copy of a table state and will
+ * not reflect any subsequent changed made to the original table.
+ *
+ * <p>While this class captures the metadata file location that can be used to load the complete
+ * table metadata, it directly persists the current schema, spec, sort order, table properties to
+ * avoid reading the metadata file from other nodes for frequently needed metadata.
+ *
+ * <p>The implementation assumes the passed instances of {@link FileIO}, {@link EncryptionManager},
+ * {@link LocationProvider} are serializable. If you are serializing the table using a custom
+ * serialization framework like Kryo, those instances of {@link FileIO}, {@link EncryptionManager},
+ * {@link LocationProvider} must be supported by that particular serialization framework.
+ *
+ * <p><em>Note:</em> loading the complete metadata from a large number of nodes can overwhelm the
+ * storage.
+ */
+public class SerializableTable implements Table, Serializable {
+
+  private final String name;
+  private final String location;
+  private final String metadataFileLocation;
+  private final Map<String, String> properties;
+  private final String schemaAsJson;
+  private final int defaultSpecId;
+  private final Map<Integer, String> specAsJsonMap;
+  private final String sortOrderAsJson;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final LocationProvider locationProvider;
+  private final Map<String, SnapshotRef> refs;
+
+  private transient volatile Table lazyTable = null;
+  private transient volatile Schema lazySchema = null;
+  private transient volatile Map<Integer, PartitionSpec> lazySpecs = null;
+  private transient volatile SortOrder lazySortOrder = null;
+
+  protected SerializableTable(Table table) {
+    this.name = table.name();
+    this.location = table.location();
+    this.metadataFileLocation = metadataFileLocation(table);
+    this.properties = SerializableMap.copyOf(table.properties());
+    this.schemaAsJson = SchemaParser.toJson(table.schema());
+    this.defaultSpecId = table.spec().specId();
+    this.specAsJsonMap = Maps.newHashMap();
+    Map<Integer, PartitionSpec> specs = table.specs();
+    specs.forEach((specId, spec) -> specAsJsonMap.put(specId, PartitionSpecParser.toJson(spec)));
+    this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
+    this.io = fileIO(table);
+    this.encryption = table.encryption();
+    this.locationProvider = table.locationProvider();
+    this.refs = SerializableMap.copyOf(table.refs());
+  }
+
+  /**
+   * Creates a read-only serializable table that can be sent to other nodes in a cluster.
+   *
+   * @param table the original table to copy the state from
+   * @return a read-only serializable table reflecting the current state of the original table
+   */
+  public static Table copyOf(Table table) {
+    if (table instanceof BaseMetadataTable) {
+      return new SerializableMetadataTable((BaseMetadataTable) table);
+    } else {
+      return new SerializableTable(table);
+    }
+  }
+
+  private String metadataFileLocation(Table table) {
+    if (table instanceof HasTableOperations) {
+      TableOperations ops = ((HasTableOperations) table).operations();
+      return ops.current().metadataFileLocation();
+    } else {
+      return null;
+    }
+  }
+
+  private FileIO fileIO(Table table) {
+    if (table.io() instanceof HadoopConfigurable) {
+      ((HadoopConfigurable) table.io()).serializeConfWith(SerializableConfSupplier::new);
+    }
+
+    return table.io();
+  }
+
+  private Table lazyTable() {
+    if (lazyTable == null) {
+      synchronized (this) {
+        if (lazyTable == null) {
+          if (metadataFileLocation == null) {
+            throw new UnsupportedOperationException(
+                    "Cannot load metadata: metadata file location is null");
+          }
+
+          TableOperations ops =
+                  new StaticTableOperations(metadataFileLocation, io, locationProvider);
+          this.lazyTable = newTable(ops, name);
+        }
+      }
+    }
+
+    return lazyTable;
+  }
+
+  protected Table newTable(TableOperations ops, String tableName) {
+    return new BaseTable(ops, tableName);
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  public String location() {
+    return location;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return properties;
+  }
+
+  @Override
+  public Schema schema() {
+    if (lazySchema == null) {
+      synchronized (this) {
+        if (lazySchema == null && lazyTable == null) {
+          // prefer parsing JSON as opposed to loading the metadata
+          this.lazySchema = SchemaParser.fromJson(schemaAsJson);
+        } else if (lazySchema == null) {
+          this.lazySchema = lazyTable.schema();
+        }
+      }
+    }
+
+    return lazySchema;
+  }
+
+  @Override
+  public Map<Integer, Schema> schemas() {
+    return lazyTable().schemas();
+  }
+
+  @Override
+  public PartitionSpec spec() {
+    return specs().get(defaultSpecId);
+  }
+
+  @Override
+  public Map<Integer, PartitionSpec> specs() {
+    if (lazySpecs == null) {
+      synchronized (this) {
+        if (lazySpecs == null && lazyTable == null) {
+          // prefer parsing JSON as opposed to loading the metadata
+          Map<Integer, PartitionSpec> specs = Maps.newHashMapWithExpectedSize(specAsJsonMap.size());
+          specAsJsonMap.forEach(
+              (specId, specAsJson) -> {
+                specs.put(specId, PartitionSpecParser.fromJson(schema(), specAsJson));
+              });
+          this.lazySpecs = specs;
+        } else if (lazySpecs == null) {
+          this.lazySpecs = lazyTable.specs();
+        }
+      }
+    }
+
+    return lazySpecs;
+  }
+
+  @Override
+  public SortOrder sortOrder() {
+    if (lazySortOrder == null) {
+      synchronized (this) {
+        if (lazySortOrder == null && lazyTable == null) {
+          // prefer parsing JSON as opposed to loading the metadata
+          this.lazySortOrder = SortOrderParser.fromJson(schema(), sortOrderAsJson);
+        } else if (lazySortOrder == null) {
+          this.lazySortOrder = lazyTable.sortOrder();
+        }
+      }
+    }
+
+    return lazySortOrder;
+  }
+
+  @Override
+  public Map<Integer, SortOrder> sortOrders() {
+    return lazyTable().sortOrders();
+  }
+
+  @Override
+  public FileIO io() {
+    return io;
+  }
+
+  @Override
+  public EncryptionManager encryption() {
+    return encryption;
+  }
+
+  @Override
+  public LocationProvider locationProvider() {
+    return locationProvider;
+  }
+
+  @Override
+  public List<StatisticsFile> statisticsFiles() {
+    return lazyTable().statisticsFiles();
+  }
+
+  @Override
+  public Map<String, SnapshotRef> refs() {
+    return refs;
+  }
+
+  @Override
+  public void refresh() {
+    throw new UnsupportedOperationException(errorMsg("refresh"));
+  }
+
+  @Override
+  public TableScan newScan() {
+    return lazyTable().newScan();
+  }
+
+  public IncrementalAppendScan newIncrementalAppendScan() {
+    return lazyTable().newIncrementalAppendScan();
+  }
+
+  @Override
+  public BatchScan newBatchScan() {
+    return lazyTable().newBatchScan();
+  }
+
+  @Override
+  public Snapshot currentSnapshot() {
+    return lazyTable().currentSnapshot();
+  }
+
+  @Override
+  public Snapshot snapshot(long snapshotId) {
+    return lazyTable().snapshot(snapshotId);
+  }
+
+  @Override
+  public Iterable<Snapshot> snapshots() {
+    return lazyTable().snapshots();
+  }
+
+  @Override
+  public List<HistoryEntry> history() {
+    return lazyTable().history();
+  }
+
+  @Override
+  public UpdateSchema updateSchema() {
+    throw new UnsupportedOperationException(errorMsg("updateSchema"));
+  }
+
+  @Override
+  public UpdatePartitionSpec updateSpec() {
+    throw new UnsupportedOperationException(errorMsg("updateSpec"));
+  }
+
+  @Override
+  public UpdateProperties updateProperties() {
+    throw new UnsupportedOperationException(errorMsg("updateProperties"));
+  }
+
+  @Override
+  public ReplaceSortOrder replaceSortOrder() {
+    throw new UnsupportedOperationException(errorMsg("replaceSortOrder"));
+  }
+
+  @Override
+  public UpdateLocation updateLocation() {
+    throw new UnsupportedOperationException(errorMsg("updateLocation"));
+  }
+
+  @Override
+  public AppendFiles newAppend() {
+    throw new UnsupportedOperationException(errorMsg("newAppend"));
+  }
+
+  @Override
+  public RewriteFiles newRewrite() {
+    throw new UnsupportedOperationException(errorMsg("newRewrite"));
+  }
+
+  @Override
+  public RewriteManifests rewriteManifests() {
+    throw new UnsupportedOperationException(errorMsg("rewriteManifests"));
+  }
+
+  @Override
+  public OverwriteFiles newOverwrite() {
+    throw new UnsupportedOperationException(errorMsg("newOverwrite"));
+  }
+
+  @Override
+  public RowDelta newRowDelta() {
+    throw new UnsupportedOperationException(errorMsg("newRowDelta"));
+  }
+
+  @Override
+  public ReplacePartitions newReplacePartitions() {
+    throw new UnsupportedOperationException(errorMsg("newReplacePartitions"));
+  }
+
+  @Override
+  public DeleteFiles newDelete() {
+    throw new UnsupportedOperationException(errorMsg("newDelete"));
+  }
+
+  @Override
+  public UpdateStatistics updateStatistics() {
+    throw new UnsupportedOperationException(errorMsg("updateStatistics"));
+  }
+
+  @Override
+  public ExpireSnapshots expireSnapshots() {
+    throw new UnsupportedOperationException(errorMsg("expireSnapshots"));
+  }
+
+  @Override
+  public ManageSnapshots manageSnapshots() {
+    throw new UnsupportedOperationException(errorMsg("manageSnapshots"));
+  }
+
+  @Override
+  public Transaction newTransaction() {
+    throw new UnsupportedOperationException(errorMsg("newTransaction"));
+  }
+
+  private String errorMsg(String operation) {
+    return String.format("Operation %s is not supported after the table is serialized", operation);
+  }
+
+  public static class SerializableMetadataTable extends SerializableTable {
+    private final MetadataTableType type;
+    private final String baseTableName;
+
+    protected SerializableMetadataTable(BaseMetadataTable metadataTable) {
+      super(metadataTable);
+      this.type = metadataTable.metadataTableType();
+      this.baseTableName = metadataTable.table().name();
+    }
+
+    @Override
+    protected Table newTable(TableOperations ops, String tableName) {
+      return MetadataTableUtils.createMetadataTableInstance(ops, baseTableName, tableName, type);
+    }
+
+    public MetadataTableType type() {
+      return type;
+    }
+  }
+
+  // captures the current state of a Hadoop configuration in a serializable manner
+  private static class SerializableConfSupplier implements SerializableSupplier<Configuration> {
+
+    private final Map<String, String> confAsMap;
+    private transient volatile Configuration conf = null;
+
+    SerializableConfSupplier(Configuration conf) {
+      this.confAsMap = Maps.newHashMapWithExpectedSize(conf.size());
+      conf.forEach(entry -> confAsMap.put(entry.getKey(), entry.getValue()));
+    }
+
+    @Override
+    public Configuration get() {
+      if (conf == null) {
+        synchronized (this) {
+          if (conf == null) {
+            Configuration newConf = new Configuration(false);
+            confAsMap.forEach(newConf::set);
+            this.conf = newConf;
+          }
+        }
+      }
+
+      return conf;
+    }
+  }
+}
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 1ff486b264d..cd3a74f1dac 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -506,6 +506,7 @@ TOK_TRUNCATE;
 TOK_BUCKET;
 TOK_AS_OF_TIME;
 TOK_AS_OF_VERSION;
+TOK_FROM_VERSION;
 }
 
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
index 53726c93d5f..d6e28e5a685 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveJoinInsertInc
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializationRelMetadataProvider;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializedViewRule;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializedViewUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HivePushdownSnapshotFilterRule;
 import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.MaterializedViewRewritingRelVisitor;
 import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveIncrementalRelMdRowCount;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
@@ -244,6 +245,10 @@ public class AlterMaterializedViewRebuildAnalyzer extends CalcitePlanner {
       // Optimize plan
       basePlan = executeProgram(basePlan, program.build(), mdProvider, executorProvider, singletonList(materialization));
 
+      program = new HepProgramBuilder();
+      generatePartialProgram(program, false, HepMatchOrder.DEPTH_FIRST, HivePushdownSnapshotFilterRule.INSTANCE);
+      basePlan = executeProgram(basePlan, program.build(), mdProvider, executorProvider);
+
       perfLogger.perfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: View-based rewriting");
 
       List<Table> materializedViewsUsedOriginalPlan = getMaterializedViewsUsed(calcitePreMVRewritingPlan);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 951e20687a0..fee00fa4d2f 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -993,6 +993,10 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     if (scanDesc.getAsOfVersion() != null) {
       jobConf.set(TableScanDesc.AS_OF_VERSION, scanDesc.getAsOfVersion());
     }
+
+    if (scanDesc.getVersionIntervalFrom() != null) {
+      jobConf.set(TableScanDesc.FROM_VERSION, scanDesc.getVersionIntervalFrom());
+    }
   }
 
   protected void pushProjectionsAndFiltersAndAsOf(JobConf jobConf, Path splitPath) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 9e4cdddab8f..5eef538556e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.MaterializationSnapshot;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
@@ -106,6 +107,24 @@ import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.SourceTable;
+import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaException;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.PartitionDropOptions;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.AllTableConstraintsRequest;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
@@ -114,7 +133,6 @@ import org.apache.hadoop.hive.metastore.api.CmRecycleRequest;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionResponse;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -128,7 +146,6 @@ import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
 import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
-import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
 import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest;
 import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse;
 import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
@@ -137,7 +154,6 @@ import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
 import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
 import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
-import org.apache.hadoop.hive.metastore.api.GetTableRequest;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
 import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
@@ -147,7 +163,6 @@ import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.Materialization;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
 import org.apache.hadoop.hive.metastore.api.PartitionSpec;
 import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD;
@@ -170,7 +185,6 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest;
-import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest;
 import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
 import org.apache.hadoop.hive.metastore.api.WMMapping;
 import org.apache.hadoop.hive.metastore.api.WMNullablePool;
@@ -183,18 +197,6 @@ import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
 import org.apache.hadoop.hive.metastore.api.WriteNotificationLogBatchRequest;
 import org.apache.hadoop.hive.metastore.api.AbortCompactionRequest;
 import org.apache.hadoop.hive.metastore.api.AbortCompactResponse;
-import org.apache.hadoop.hive.ql.io.HdfsUtils;
-import org.apache.hadoop.hive.metastore.HiveMetaException;
-import org.apache.hadoop.hive.metastore.HiveMetaHook;
-import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.PartitionDropOptions;
-import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
-import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -2080,8 +2082,7 @@ public class Hive {
             } else {
               // Obtain additional information if we should try incremental rewriting / rebuild
               // We will not try partial rewriting if there were update/delete/compaction operations on source tables
-              Materialization invalidationInfo = getMSC().getMaterializationInvalidationInfo(
-                  materializedViewTable.getMVMetadata().creationMetadata, conf.get(ValidTxnList.VALID_TXNS_KEY));
+              Materialization invalidationInfo = getMaterializationInvalidationInfo(materializedViewTable.getMVMetadata());
               if (invalidationInfo == null || invalidationInfo.isSourceTablesUpdateDeleteModified() ||
                   invalidationInfo.isSourceTablesCompacted()) {
                 // We ignore (as it did not meet the requirements), but we do not need to update it in the
@@ -2101,6 +2102,55 @@ public class Hive {
     }
   }
 
+  private Materialization getMaterializationInvalidationInfo(MaterializedViewMetadata metadata)
+      throws TException, HiveException {
+    Optional<SourceTable> first = metadata.getSourceTables().stream().findFirst();
+    if (!first.isPresent()) {
+      // This is unexpected: all MV must have at least one source
+      Materialization materialization = new Materialization();
+      materialization.setSourceTablesCompacted(true);
+      materialization.setSourceTablesUpdateDeleteModified(true);
+      return new Materialization();
+    } else {
+      Table table = getTable(first.get().getTable().getDbName(), first.get().getTable().getTableName());
+      if (!(table.isNonNative() && table.getStorageHandler().areSnapshotsSupported())) {
+        // Mixing native and non-native acid source tables are not supported. If the first source is native acid
+        // the rest is expected to be native acid
+        return getMSC().getMaterializationInvalidationInfo(
+                metadata.creationMetadata, conf.get(ValidTxnList.VALID_TXNS_KEY));
+      }
+    }
+
+    MaterializationSnapshot mvSnapshot = MaterializationSnapshot.fromJson(metadata.creationMetadata.getValidTxnList());
+
+    boolean hasAppendsOnly = true;
+    for (SourceTable sourceTable : metadata.getSourceTables()) {
+      Table table = getTable(sourceTable.getTable().getDbName(), sourceTable.getTable().getTableName());
+      HiveStorageHandler storageHandler = table.getStorageHandler();
+      if (storageHandler == null) {
+        Materialization materialization = new Materialization();
+        materialization.setSourceTablesCompacted(true);
+        return materialization;
+      }
+      Boolean b = storageHandler.hasAppendsOnly(
+          table, mvSnapshot.getTableSnapshots().get(table.getFullyQualifiedName()));
+      if (b == null) {
+        Materialization materialization = new Materialization();
+        materialization.setSourceTablesCompacted(true);
+        return materialization;
+      } else if (!b) {
+        hasAppendsOnly = false;
+        break;
+      }
+    }
+    Materialization materialization = new Materialization();
+    // TODO: delete operations are not supported yet.
+    // Set setSourceTablesCompacted to false when delete is supported
+    materialization.setSourceTablesCompacted(!hasAppendsOnly);
+    materialization.setSourceTablesUpdateDeleteModified(!hasAppendsOnly);
+    return materialization;
+  }
+
   /**
    * Get the materialized views that have been enabled for rewriting from the
    * metastore. If the materialized view is in the cache, we do not need to
@@ -2178,8 +2228,7 @@ public class Hive {
           } else {
             // Obtain additional information if we should try incremental rewriting / rebuild
             // We will not try partial rewriting if there were update/delete/compaction operations on source tables
-            invalidationInfo = getMSC().getMaterializationInvalidationInfo(
-                    metadata.creationMetadata, conf.get(ValidTxnList.VALID_TXNS_KEY));
+            invalidationInfo = getMaterializationInvalidationInfo(metadata);
             ignore = invalidationInfo == null || invalidationInfo.isSourceTablesCompacted();
           }
           if (ignore) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 44dfae08167..aff2f51cbc1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -552,4 +552,8 @@ public interface HiveStorageHandler extends Configurable {
   default void prepareAlterTableEnvironmentContext(AbstractAlterTableDesc alterTableDesc,
       EnvironmentContext environmentContext) {
   }
+
+  default Boolean hasAppendsOnly(org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) {
+    return null;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index a9ecad39ace..d422fbf3603 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.CreationMetadata;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -60,7 +59,6 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
@@ -134,6 +132,7 @@ public class Table implements Serializable {
    * The version of the table. For Iceberg tables this is the snapshotId.
    */
   private String asOfVersion = null;
+  private String versionIntervalFrom = null;
 
   /**
    * The version of the table at the given timestamp. The format will be parsed with
@@ -180,6 +179,7 @@ public class Table implements Serializable {
 
     newTab.setAsOfTimestamp(this.asOfTimestamp);
     newTab.setAsOfVersion(this.asOfVersion);
+    newTab.setVersionIntervalFrom(this.versionIntervalFrom);
 
     newTab.setMetaTable(this.getMetaTable());
     return newTab;
@@ -595,6 +595,9 @@ public class Table implements Serializable {
     if (!Objects.equals(asOfVersion, other.asOfVersion)) {
       return false;
     }
+    if (!Objects.equals(versionIntervalFrom, other.versionIntervalFrom)) {
+      return false;
+    }
     return true;
   }
 
@@ -1327,6 +1330,14 @@ public class Table implements Serializable {
     this.asOfVersion = asOfVersion;
   }
 
+  public String getVersionIntervalFrom() {
+    return versionIntervalFrom;
+  }
+
+  public void setVersionIntervalFrom(String versionIntervalFrom) {
+    this.versionIntervalFrom = versionIntervalFrom;
+  }
+
   public String getAsOfTimestamp() {
     return asOfTimestamp;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
index ac075a52c32..8ffd41c49f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
@@ -58,6 +58,7 @@ public enum VirtualColumn {
   PARTITION_HASH("PARTITION__HASH", TypeInfoFactory.longTypeInfo),
   FILE_PATH("FILE__PATH", TypeInfoFactory.stringTypeInfo),
   ROW_POSITION("ROW__POSITION", TypeInfoFactory.longTypeInfo),
+  SNAPSHOT_ID("SNAPSHOT__ID", TypeInfoFactory.longTypeInfo),
 
   /**
    * GROUPINGID is used with GROUP BY GROUPINGS SETS, ROLLUP and CUBE.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java
index dfeab795f26..26ceedac8c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java
@@ -50,6 +50,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
 import com.google.common.collect.ImmutableSet;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
 
 /**
  * Relational expression representing a scan of a HiveDB collection.
@@ -207,7 +209,9 @@ public class HiveTableScan extends TableScan implements HiveRelNode {
       .itemIf("plKey", ((RelOptHiveTable) table).getPartitionListKey(), pw.getDetailLevel() == SqlExplainLevel.DIGEST_ATTRIBUTES)
       .itemIf("table:alias", tblAlias, !this.useQBIdInDigest)
       .itemIf("tableScanTrait", this.tableScanTrait,
-          pw.getDetailLevel() == SqlExplainLevel.DIGEST_ATTRIBUTES);
+          pw.getDetailLevel() == SqlExplainLevel.DIGEST_ATTRIBUTES)
+      .itemIf("fromVersion", ((RelOptHiveTable) table).getHiveTableMD().getVersionIntervalFrom(),
+          isNotBlank(((RelOptHiveTable) table).getHiveTableMD().getVersionIntervalFrom()));
   }
 
   @Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAugmentSnapshotMaterializationRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAugmentSnapshotMaterializationRule.java
new file mode 100644
index 00000000000..26ff609ab6e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAugmentSnapshotMaterializationRule.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBeans;
+import org.apache.hadoop.hive.common.type.SnapshotContext;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This rule will rewrite the materialized view with information about
+ * its invalidation data. In particular, if any of the tables used by the
+ * materialization has been updated since the materialization was created,
+ * it will introduce a filter operator on top of that table in the materialization
+ * definition, making explicit the data contained in it so the rewriting
+ * algorithm can use this information to rewrite the query as a combination of the
+ * outdated materialization data and the new original data in the source tables.
+ * If the data in the source table matches the current data in the snapshot,
+ * no filter is created.
+ * In case of tables supports snapshots the filtering should be performed in the
+ * TableScan operator to read records only from the relevant snapshots.
+ * However, the union rewrite algorithm needs a so-called compensation predicate in
+ * a Filter operator to build the union branch produces the delta records.
+ * After union rewrite algorithm is executed the predicates on SnapshotIds
+ * are pushed down to the corresponding TableScan operator and removed from the Filter
+ * operator. So the reference to the {@link VirtualColumn#SNAPSHOT_ID} is temporary in the
+ * logical plan.
+ *
+ * @see HivePushdownSnapshotFilterRule
+ */
+public class HiveAugmentSnapshotMaterializationRule extends RelRule<HiveAugmentSnapshotMaterializationRule.Config> {
+
+  public static RelOptRule with(Map<String, SnapshotContext> mvMetaStoredSnapshot) {
+    return RelRule.Config.EMPTY.as(HiveAugmentSnapshotMaterializationRule.Config.class)
+            .withMvMetaStoredSnapshot(mvMetaStoredSnapshot)
+            .withRelBuilderFactory(HiveRelFactories.HIVE_BUILDER)
+            .withOperandSupplier(operandBuilder -> operandBuilder.operand(TableScan.class).anyInputs())
+            .withDescription("HiveAugmentSnapshotMaterializationRule")
+            .toRule();
+  }
+
+  public interface Config extends RelRule.Config {
+
+    HiveAugmentSnapshotMaterializationRule.Config withMvMetaStoredSnapshot(
+            Map<String, SnapshotContext> mvMetaStoredSnapshot);
+
+    @ImmutableBeans.Property
+    Map<String, SnapshotContext> getMvMetaStoredSnapshot();
+
+    @Override default HiveAugmentSnapshotMaterializationRule toRule() {
+      return new HiveAugmentSnapshotMaterializationRule(this);
+    }
+  }
+
+  private static RelDataType snapshotIdType = null;
+
+  private static RelDataType snapshotIdType(RelBuilder relBuilder) {
+    if (snapshotIdType == null) {
+      try {
+        snapshotIdType = relBuilder.getTypeFactory().createSqlType(
+            TypeConverter.convert(VirtualColumn.SNAPSHOT_ID.getTypeInfo(),
+                relBuilder.getTypeFactory()).getSqlTypeName());
+      } catch (CalciteSemanticException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    return snapshotIdType;
+  }
+
+  private final Set<RelNode> visited;
+  private final Map<String, SnapshotContext> mvMetaStoredSnapshot;
+
+  public HiveAugmentSnapshotMaterializationRule(Config config) {
+    super(config);
+    this.mvMetaStoredSnapshot = config.getMvMetaStoredSnapshot();
+    this.visited = new HashSet<>();
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final TableScan tableScan = call.rel(0);
+    if (!visited.add(tableScan)) {
+      // Already visited
+      return;
+    }
+
+    RelOptHiveTable hiveTable = (RelOptHiveTable) tableScan.getTable();
+    Table table = hiveTable.getHiveTableMD();
+
+    SnapshotContext mvMetaTableSnapshot = mvMetaStoredSnapshot.get(table.getFullyQualifiedName());
+    if (mvMetaTableSnapshot.equals(table.getStorageHandler().getCurrentSnapshotContext(table))) {
+      return;
+    }
+
+    table.setVersionIntervalFrom(Long.toString(mvMetaTableSnapshot.getSnapshotId()));
+
+    RexBuilder rexBuilder = call.builder().getRexBuilder();
+    int snapshotIdIndex = tableScan.getTable().getRowType().getField(
+        VirtualColumn.SNAPSHOT_ID.getName(), false, false).getIndex();
+    RexNode snapshotIdInputRef = rexBuilder.makeInputRef(
+        tableScan.getTable().getRowType().getFieldList().get(snapshotIdIndex).getType(), snapshotIdIndex);
+
+    final RelBuilder relBuilder = call.builder();
+    relBuilder.push(tableScan);
+    List<RexNode> conds = new ArrayList<>();
+    final RexNode literalHighWatermark = rexBuilder.makeLiteral(
+        mvMetaTableSnapshot.getSnapshotId(), snapshotIdType(relBuilder), false);
+    conds.add(
+        rexBuilder.makeCall(
+            SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+            ImmutableList.of(snapshotIdInputRef, literalHighWatermark)));
+    relBuilder.filter(conds);
+    call.transformTo(relBuilder.build());
+  }
+}
\ No newline at end of file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveMaterializedViewUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveMaterializedViewUtils.java
index 4767b08f841..b00067f9fd2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveMaterializedViewUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveMaterializedViewUtils.java
@@ -32,6 +32,7 @@ import org.apache.calcite.adapter.druid.DruidQuery;
 import org.apache.calcite.interpreter.BindableConvention;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.hep.HepPlanner;
 import org.apache.calcite.plan.hep.HepProgramBuilder;
@@ -78,6 +79,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.common.util.TxnIdUtils;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -236,17 +238,26 @@ public class HiveMaterializedViewUtils {
       HiveRelOptMaterialization materialization, String validTxnsList,
       MaterializationSnapshot snapshot) throws LockException {
 
+    RelNode modifiedQueryRel;
     if (snapshot != null && snapshot.getTableSnapshots() != null && !snapshot.getTableSnapshots().isEmpty()) {
-      // Not supported yet for Iceberg tables
-      return materialization;
+      modifiedQueryRel = applyRule(
+              materialization.queryRel, HiveAugmentSnapshotMaterializationRule.with(snapshot.getTableSnapshots()));
+    } else {
+      String materializationTxnList = snapshot != null ? snapshot.getValidTxnList() : null;
+      modifiedQueryRel = augmentMaterializationWithTimeInformation(
+              materialization, validTxnsList, new ValidTxnWriteIdList(materializationTxnList));
     }
 
-    String materializationTxnList = snapshot != null ? snapshot.getValidTxnList() : null;
-    return augmentMaterializationWithTimeInformation(
-        materialization, validTxnsList, new ValidTxnWriteIdList(materializationTxnList));
+    return new HiveRelOptMaterialization(materialization.tableRel, modifiedQueryRel,
+            null, materialization.qualifiedTableName, materialization.getScope(), materialization.getRebuildMode(),
+            materialization.getAst());
   }
 
-  private static HiveRelOptMaterialization augmentMaterializationWithTimeInformation(
+  /**
+   * Method to enrich the materialization query contained in the input with
+   * its invalidation when materialization has native acid source tables.
+   */
+  private static RelNode augmentMaterializationWithTimeInformation(
       HiveRelOptMaterialization materialization, String validTxnsList,
       ValidTxnWriteIdList materializationTxnList) throws LockException {
     // Extract tables used by the query which will in turn be used to generate
@@ -266,15 +277,22 @@ public class HiveMaterializedViewUtils {
         SessionState.get().getTxnMgr().getValidWriteIds(tablesUsed, validTxnsList);
     // Augment
     final RexBuilder rexBuilder = materialization.queryRel.getCluster().getRexBuilder();
-    final HepProgramBuilder augmentMaterializationProgram = new HepProgramBuilder()
-        .addRuleInstance(new HiveAugmentMaterializationRule(rexBuilder, currentTxnList, materializationTxnList));
-    final HepPlanner augmentMaterializationPlanner = new HepPlanner(
-        augmentMaterializationProgram.build());
-    augmentMaterializationPlanner.setRoot(materialization.queryRel);
-    final RelNode modifiedQueryRel = augmentMaterializationPlanner.findBestExp();
-    return new HiveRelOptMaterialization(materialization.tableRel, modifiedQueryRel,
-        null, materialization.qualifiedTableName, materialization.getScope(), materialization.getRebuildMode(),
-            materialization.getAst());
+    return applyRule(
+            materialization.queryRel, new HiveAugmentMaterializationRule(rexBuilder, currentTxnList, materializationTxnList));
+  }
+
+  /**
+   * Method to apply a rule to a query plan.
+   */
+  @NotNull
+  private static RelNode applyRule(
+          RelNode basePlan, RelOptRule relOptRule) {
+    final HepProgramBuilder programBuilder = new HepProgramBuilder();
+    programBuilder.addRuleInstance(relOptRule);
+    final HepPlanner planner = new HepPlanner(
+        programBuilder.build());
+    planner.setRoot(basePlan);
+    return planner.findBestExp();
   }
 
   /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HivePushdownSnapshotFilterRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HivePushdownSnapshotFilterRule.java
new file mode 100644
index 00000000000..2de1046d219
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HivePushdownSnapshotFilterRule.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexTableInputRef;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+
+import java.util.Set;
+
+/**
+ * Calcite rule to push down predicates contains {@link VirtualColumn#SNAPSHOT_ID} reference to TableScan.
+ * <p>
+ * This rule traverse the logical expression in {@link HiveFilter} operators and search for
+ * predicates like
+ * <p>
+ * <code>
+ *   snapshotId &lt;= 12345677899
+ * </code>
+ * <p>
+ * The literal is set in the {@link RelOptHiveTable#getHiveTableMD()} object wrapped by
+ * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan}
+ * and the original predicate in the {@link HiveFilter} is replaced with literal true.
+ *
+ * @see HiveAugmentSnapshotMaterializationRule
+ */
+public class HivePushdownSnapshotFilterRule extends RelRule<HivePushdownSnapshotFilterRule.Config> {
+
+  public static final RelOptRule INSTANCE =
+          RelRule.Config.EMPTY.as(HivePushdownSnapshotFilterRule.Config.class)
+            .withRelBuilderFactory(HiveRelFactories.HIVE_BUILDER)
+            .withOperandSupplier(operandBuilder -> operandBuilder.operand(HiveFilter.class).anyInputs())
+            .withDescription("HivePushdownSnapshotFilterRule")
+            .toRule();
+
+  public interface Config extends RelRule.Config {
+    @Override
+    default HivePushdownSnapshotFilterRule toRule() {
+      return new HivePushdownSnapshotFilterRule(this);
+    }
+  }
+
+  private HivePushdownSnapshotFilterRule(Config config) {
+    super(config);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    HiveFilter filter = call.rel(0);
+    RexNode newCondition = filter.getCondition().accept(new SnapshotIdShuttle(call.builder().getRexBuilder(), call.getMetadataQuery(), filter));
+    call.transformTo(call.builder().push(filter.getInput()).filter(newCondition).build());
+  }
+
+  static class SnapshotIdShuttle extends RexShuttle {
+
+    private final RexBuilder rexBuilder;
+    private final RelMetadataQuery metadataQuery;
+    private final RelNode startNode;
+
+    public SnapshotIdShuttle(RexBuilder rexBuilder, RelMetadataQuery metadataQuery, RelNode startNode) {
+      this.rexBuilder = rexBuilder;
+      this.metadataQuery = metadataQuery;
+      this.startNode = startNode;
+    }
+
+    @Override
+    public RexNode visitCall(RexCall call) {
+      if (call.operands.size() == 2 &&
+              (setSnapShotId(call.operands.get(0), call.operands.get(1)) ||
+                      setSnapShotId(call.operands.get(1), call.operands.get(0)))) {
+        return rexBuilder.makeLiteral(true);
+      }
+
+      return super.visitCall(call);
+    }
+
+    private boolean setSnapShotId(RexNode op1, RexNode op2) {
+      if (!op1.isA(SqlKind.LITERAL)) {
+        return false;
+      }
+
+      RexLiteral literal = (RexLiteral) op1;
+      if (literal.getType().getSqlTypeName().getFamily() != SqlTypeFamily.NUMERIC) {
+        return false;
+      }
+
+      long snapshotId = literal.getValueAs(Long.class);
+
+      RelOptTable relOptTable = getRelOptTableOf(op2);
+      if (relOptTable == null) {
+        return false;
+      }
+
+      RelOptHiveTable hiveTable = (RelOptHiveTable) relOptTable;
+      hiveTable.getHiveTableMD().setVersionIntervalFrom(Long.toString(snapshotId));
+      return true;
+    }
+
+    private RelOptTable getRelOptTableOf(RexNode rexNode) {
+      if (!(rexNode instanceof RexInputRef)) {
+        return null;
+      }
+
+      RexInputRef rexInputRef = (RexInputRef) rexNode;
+      Set<RexNode> rexNodeSet = metadataQuery.getExpressionLineage(startNode, rexInputRef);
+      if (rexNodeSet == null || rexNodeSet.size() != 1) {
+        return null;
+      }
+
+      RexNode resultRexNode = rexNodeSet.iterator().next();
+      if (!(resultRexNode instanceof RexTableInputRef)) {
+        return null;
+      }
+      RexTableInputRef tableInputRef = (RexTableInputRef) resultRexNode;
+
+      RelOptTable relOptTable = tableInputRef.getTableRef().getTable();
+      RelDataTypeField snapshotIdField = relOptTable.getRowType().getField(
+              VirtualColumn.SNAPSHOT_ID.getName(), false, false);
+      if (snapshotIdField == null) {
+        return null;
+      }
+
+      return snapshotIdField.getIndex() == tableInputRef.getIndex() ? relOptTable : null;
+    }
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
index 39ebf61d1a0..d8beb0cdb89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
@@ -99,6 +99,12 @@ public class ASTBuilder {
       b.add(asOfBuilder);
     }
 
+    if (hTbl.getHiveTableMD().getVersionIntervalFrom() != null) {
+      ASTBuilder asOfBuilder = ASTBuilder.construct(HiveParser.TOK_FROM_VERSION, "TOK_FROM_VERSION")
+          .add(HiveParser.Number, hTbl.getHiveTableMD().getVersionIntervalFrom());
+      b.add(asOfBuilder);
+    }
+
     ASTBuilder propList = ASTBuilder.construct(HiveParser.TOK_TABLEPROPLIST, "TOK_TABLEPROPLIST");
     if (scan instanceof DruidQuery) {
       //Passing query spec, column names and column types to be used as part of Hive Physical execution
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index f5d9a8d3cdd..36fd7e471c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -572,6 +572,7 @@ public abstract class BaseSemanticAnalyzer {
     int ssampleIndex = -1;
     int asOfTimeIndex = -1;
     int asOfVersionIndex = -1;
+    int asOfVersionFromIndex = -1;
     for (int index = 1; index < tabref.getChildCount(); index++) {
       ASTNode ct = (ASTNode) tabref.getChild(index);
       if (ct.getToken().getType() == HiveParser.TOK_TABLEBUCKETSAMPLE) {
@@ -584,11 +585,14 @@ public abstract class BaseSemanticAnalyzer {
         asOfTimeIndex = index;
       } else if (ct.getToken().getType() == HiveParser.TOK_AS_OF_VERSION) {
         asOfVersionIndex = index;
+      } else if (ct.getToken().getType() == HiveParser.TOK_FROM_VERSION) {
+        asOfVersionFromIndex = index;
       } else {
         aliasIndex = index;
       }
     }
-    return new int[] {aliasIndex, propsIndex, tsampleIndex, ssampleIndex, asOfTimeIndex, asOfVersionIndex};
+    return new int[] {
+        aliasIndex, propsIndex, tsampleIndex, ssampleIndex, asOfTimeIndex, asOfVersionIndex, asOfVersionFromIndex};
   }
 
   /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index ac69a5e7595..d8850ffbafb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -353,6 +353,7 @@ import java.util.stream.IntStream;
 
 import javax.sql.DataSource;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.hadoop.hive.ql.optimizer.calcite.HiveMaterializedViewASTSubQueryRewriteShuttle.getMaterializedViewByAST;
 import static org.apache.hadoop.hive.ql.metadata.HiveRelOptMaterialization.RewriteAlgorithm.ANY;
 
@@ -2947,6 +2948,10 @@ public class CalcitePlanner extends SemanticAnalyzer {
           if (AcidUtils.isNonNativeAcidTable(tabMetaData, false)) {
             virtualCols.addAll(tabMetaData.getStorageHandler().acidVirtualColumns());
           }
+          if (tabMetaData.isNonNative() && tabMetaData.getStorageHandler().areSnapshotsSupported() &&
+              isBlank(tabMetaData.getMetaTable())) {
+            virtualCols.add(VirtualColumn.SNAPSHOT_ID);
+          }
           for (VirtualColumn vc : virtualCols) {
             colInfo = new ColumnInfo(vc.getName(), vc.getTypeInfo(), tableAlias, true,
                 vc.getIsHidden());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
index 9caa0e7fb28..ca347878220 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
@@ -28,7 +28,6 @@ import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
 import org.apache.hadoop.hive.ql.ddl.view.create.CreateMaterializedViewDesc;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -51,7 +50,7 @@ public class QB {
   private HashMap<String, QBExpr> aliasToSubqExpr;
   private HashMap<String, Table> viewAliasToViewSchema;
   private HashMap<String, Map<String, String>> aliasToProps;
-  private HashMap<String, Pair<String, String>> aliasToAsOf;
+  private HashMap<String, QBSystemVersion> aliasToSystemVersion;
   private List<String> aliases;
   private QBParseInfo qbp;
   private QBMetaData qbm;
@@ -134,7 +133,7 @@ public class QB {
     destToWindowingSpec = new LinkedHashMap<String, WindowingSpec>();
     id = getAppendedAliasFromId(outer_id, alias);
     aliasInsideView = new HashSet<>();
-    aliasToAsOf = new LinkedHashMap<>();
+    aliasToSystemVersion = new LinkedHashMap<>();
   }
 
   // For sub-queries, the id. and alias should be appended since same aliases can be re-used
@@ -195,8 +194,8 @@ public class QB {
     aliasToProps.put(alias.toLowerCase(), props);
   }
 
-  public void setAsOf(String alias, Pair<String, String> asOf) {
-    aliasToAsOf.put(alias.toLowerCase(), asOf);
+  public void setSystemVersion(String alias, QBSystemVersion asOf) {
+    aliasToSystemVersion.put(alias.toLowerCase(), asOf);
   }
 
   public void addAlias(String alias) {
@@ -257,8 +256,8 @@ public class QB {
     return aliasToProps.get(alias.toLowerCase());
   }
 
-  public Pair<String, String> getAsOfForAlias(String alias) {
-    return aliasToAsOf.get(alias.toLowerCase());
+  public QBSystemVersion getSystemVersionForAlias(String alias) {
+    return aliasToSystemVersion.get(alias.toLowerCase());
   }
 
   public void rewriteViewToSubq(String alias, String viewName, QBExpr qbexpr, Table tab) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSystemVersion.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSystemVersion.java
new file mode 100644
index 00000000000..e579e936fb3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSystemVersion.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+public class QBSystemVersion {
+  private final String asOfVersion;
+  private final String fromVersion;
+  private final String asOfTime;
+
+  public QBSystemVersion(String asOfVersion, String fromVersion, String asOfTime) {
+    this.asOfVersion = asOfVersion;
+    this.fromVersion = fromVersion;
+    this.asOfTime = asOfTime;
+  }
+
+  public String getAsOfTime() {
+    return asOfTime;
+  }
+
+  public String getAsOfVersion() {
+    return asOfVersion;
+  }
+
+  public String getFromVersion() {
+    return fromVersion;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 067c35dac40..0b51b46b1c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -1112,6 +1112,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     int ssampleIndex = indexes[3];
     int asOfTimeIndex = indexes[4];
     int asOfVersionIndex = indexes[5];
+    int asOfVersionFromIndex = indexes[6];
 
     ASTNode tableTree = (ASTNode) (tabref.getChild(0));
 
@@ -1129,8 +1130,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       qb.setTabProps(alias, props);
     }
 
-    if (asOfTimeIndex != -1 || asOfVersionIndex != -1) {
+    if (asOfTimeIndex != -1 || asOfVersionIndex != -1 || asOfVersionFromIndex != -1) {
       String asOfVersion = asOfVersionIndex == -1 ? null : tabref.getChild(asOfVersionIndex).getChild(0).getText();
+      String asOfVersionFrom =
+          asOfVersionFromIndex == -1 ? null : tabref.getChild(asOfVersionFromIndex).getChild(0).getText();
       String asOfTime = null;
       
       if (asOfTimeIndex != -1) {
@@ -1143,8 +1146,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           asOfTime = stripQuotes(expr.getText());
         }
       }
-      Pair<String, String> asOf = Pair.of(asOfVersion, asOfTime);
-      qb.setAsOf(alias, asOf);
+      qb.setSystemVersion(alias, new QBSystemVersion(asOfVersion, asOfVersionFrom, asOfTime));
     }
 
     // If the alias is already there then we have a conflict
@@ -2268,13 +2270,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         }
       }
 
-      Pair<String, String> asOf = qb.getAsOfForAlias(alias);
+      QBSystemVersion asOf = qb.getSystemVersionForAlias(alias);
       if (asOf != null) {
         if (!Optional.ofNullable(tab.getStorageHandler()).map(HiveStorageHandler::isTimeTravelAllowed).orElse(false)) {
           throw new SemanticException(ErrorMsg.TIME_TRAVEL_NOT_ALLOWED, alias);
         }
-        tab.setAsOfVersion(asOf.getLeft());
-        tab.setAsOfTimestamp(asOf.getRight());
+        tab.setAsOfVersion(asOf.getAsOfVersion());
+        tab.setVersionIntervalFrom(asOf.getFromVersion());
+        tab.setAsOfTimestamp(asOf.getAsOfTime());
       }
 
       if (tab.isView()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 972a85ed454..9c45457b9e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -34,12 +34,9 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.mapred.TextInputFormat;
 
 import java.io.Serializable;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -115,6 +112,9 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
   public static final String AS_OF_VERSION =
       "hive.io.as.of.version";
 
+  public static final String FROM_VERSION =
+      "hive.io.version.from";
+
   // input file name (big) to bucket number
   private Map<String, Integer> bucketFileNameMapping;
 
@@ -140,6 +140,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
   private int numBuckets = -1;
 
   private String asOfVersion = null;
+  private String versionIntervalFrom = null;
 
   private String asOfTimestamp = null;
 
@@ -172,6 +173,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
       numBuckets = tblMetadata.getNumBuckets();
       asOfTimestamp = tblMetadata.getAsOfTimestamp();
       asOfVersion = tblMetadata.getAsOfVersion();
+      versionIntervalFrom = tblMetadata.getVersionIntervalFrom();
     }
     isTranscationalTable = AcidUtils.isTransactionalTable(this.tableMetadata);
     if (isTranscationalTable) {
@@ -531,6 +533,11 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
     return asOfVersion;
   }
 
+  @Explain(displayName = "Version interval from")
+  public String getVersionIntervalFrom() {
+    return versionIntervalFrom;
+  }
+
   @Explain(displayName = "As of timestamp")
   public String getAsOfTimestamp() {
     return asOfTimestamp;
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java b/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java
index 40daba97e6b..7d2080fb917 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java
@@ -63,4 +63,11 @@ public class SnapshotContext {
   public int hashCode() {
     return Objects.hash(snapshotId);
   }
+
+  @Override
+  public String toString() {
+    return "SnapshotContext{" +
+        "snapshotId=" + snapshotId +
+        '}';
+  }
 }