You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2023/03/14 11:31:09 UTC
[hive] branch master updated: HIVE-27101: Support incremental materialized view rebuild when Iceberg source tables have insert operation only. (Krisztian Kasa, reviewed by Denys Kuzmenko, Aman Sinha)
This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1c5d9b90d96 HIVE-27101: Support incremental materialized view rebuild when Iceberg source tables have insert operation only. (Krisztian Kasa, reviewed by Denys Kuzmenko, Aman Sinha)
1c5d9b90d96 is described below
commit 1c5d9b90d960af8b84a1278e2faf8073a352c191
Author: Krisztian Kasa <ka...@gmail.com>
AuthorDate: Tue Mar 14 12:30:50 2023 +0100
HIVE-27101: Support incremental materialized view rebuild when Iceberg source tables have insert operation only. (Krisztian Kasa, reviewed by Denys Kuzmenko, Aman Sinha)
---
.../org/apache/iceberg/mr/InputFormatConfig.java | 1 +
.../iceberg/mr/hive/HiveIcebergInputFormat.java | 1 +
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 24 ++
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 35 +-
.../src/test/queries/positive/mv_iceberg_orc2.q | 1 +
.../src/test/queries/positive/mv_iceberg_orc4.q | 30 ++
.../src/test/queries/positive/mv_iceberg_orc5.q | 30 ++
.../src/test/queries/positive/mv_iceberg_orc6.q | 30 ++
.../test/results/positive/mv_iceberg_orc2.q.out | 2 +-
.../test/results/positive/mv_iceberg_orc4.q.out | 130 +++++++
.../test/results/positive/mv_iceberg_orc5.q.out | 135 +++++++
.../test/results/positive/mv_iceberg_orc6.q.out | 111 ++++++
.../java/org/apache/iceberg/SerializableTable.java | 420 +++++++++++++++++++++
.../org/apache/hadoop/hive/ql/parse/HiveParser.g | 1 +
.../AlterMaterializedViewRebuildAnalyzer.java | 5 +
.../apache/hadoop/hive/ql/io/HiveInputFormat.java | 4 +
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 91 +++--
.../hive/ql/metadata/HiveStorageHandler.java | 4 +
.../org/apache/hadoop/hive/ql/metadata/Table.java | 15 +-
.../hadoop/hive/ql/metadata/VirtualColumn.java | 1 +
.../calcite/reloperators/HiveTableScan.java | 6 +-
.../HiveAugmentSnapshotMaterializationRule.java | 152 ++++++++
.../rules/views/HiveMaterializedViewUtils.java | 48 ++-
.../views/HivePushdownSnapshotFilterRule.java | 158 ++++++++
.../optimizer/calcite/translator/ASTBuilder.java | 6 +
.../hadoop/hive/ql/parse/BaseSemanticAnalyzer.java | 6 +-
.../hadoop/hive/ql/parse/CalcitePlanner.java | 5 +
.../java/org/apache/hadoop/hive/ql/parse/QB.java | 13 +-
.../hadoop/hive/ql/parse/QBSystemVersion.java | 43 +++
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 15 +-
.../apache/hadoop/hive/ql/plan/TableScanDesc.java | 13 +-
.../hadoop/hive/common/type/SnapshotContext.java | 7 +
32 files changed, 1481 insertions(+), 62 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index 2b64c79160c..ff1f6bb0a38 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -42,6 +42,7 @@ public class InputFormatConfig {
public static final String IN_MEMORY_DATA_MODEL = "iceberg.mr.in.memory.data.model";
public static final String READ_SCHEMA = "iceberg.mr.read.schema";
public static final String SNAPSHOT_ID = "iceberg.mr.snapshot.id";
+ public static final String SNAPSHOT_ID_INTERVAL_FROM = "iceberg.mr.snapshot.id.interval.from";
public static final String SPLIT_SIZE = "iceberg.mr.split.size";
public static final String SCHEMA_AUTO_CONVERSION = "iceberg.mr.schema.auto.conversion";
public static final String TABLE_IDENTIFIER = "iceberg.mr.table.identifier";
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 7a691905a2a..57c60f28cf2 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -142,6 +142,7 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
job.getBoolean(ColumnProjectionUtils.FETCH_VIRTUAL_COLUMNS_CONF_STR, false));
job.set(InputFormatConfig.AS_OF_TIMESTAMP, job.get(TableScanDesc.AS_OF_TIMESTAMP, "-1"));
job.set(InputFormatConfig.SNAPSHOT_ID, job.get(TableScanDesc.AS_OF_VERSION, "-1"));
+ job.set(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, job.get(TableScanDesc.FROM_VERSION, "-1"));
String location = job.get(InputFormatConfig.TABLE_LOCATION);
return Arrays.stream(super.getSplits(job, numSplits))
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index d3ccd7b84ee..b9ff017ac66 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -1179,4 +1179,28 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
}
return COPY_ON_WRITE.equalsIgnoreCase(mode);
}
+
+ @Override
+ public Boolean hasAppendsOnly(org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) {
+ TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
+ Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
+ boolean foundSince = false;
+ for (Snapshot snapshot : table.snapshots()) {
+ if (!foundSince) {
+ if (snapshot.snapshotId() == since.getSnapshotId()) {
+ foundSince = true;
+ }
+ } else {
+ if (!"append".equals(snapshot.operation())) {
+ return false;
+ }
+ }
+ }
+
+ if (foundSince) {
+ return true;
+ }
+
+ return null;
+ }
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index ae3b31563e8..d01e01ab034 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -47,9 +47,11 @@ import org.apache.iceberg.DataTableScan;
import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
@@ -109,8 +111,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
}
private static TableScan createTableScan(Table table, Configuration conf) {
- TableScan scan = table.newScan()
- .caseSensitive(conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT));
+ TableScan scan = table.newScan();
+
long snapshotId = conf.getLong(InputFormatConfig.SNAPSHOT_ID, -1);
if (snapshotId != -1) {
scan = scan.useSnapshot(snapshotId);
@@ -121,6 +123,23 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
scan = scan.asOfTime(asOfTime);
}
+ return scan;
+ }
+
+ private static IncrementalAppendScan createIncrementalAppendScan(Table table, Configuration conf) {
+ long fromSnapshot = conf.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1);
+ return table.newIncrementalAppendScan().fromSnapshotExclusive(fromSnapshot);
+ }
+
+ private static <
+ T extends Scan<T, FileScanTask, CombinedScanTask>> Scan<T,
+ FileScanTask,
+ CombinedScanTask> applyConfig(
+ Configuration conf, Scan<T, FileScanTask, CombinedScanTask> scanToConfigure) {
+
+ Scan<T, FileScanTask, CombinedScanTask> scan = scanToConfigure.caseSensitive(
+ conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT));
+
long splitSize = conf.getLong(InputFormatConfig.SPLIT_SIZE, 0);
if (splitSize > 0) {
scan = scan.option(TableProperties.SPLIT_SIZE, String.valueOf(splitSize));
@@ -154,7 +173,6 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
// On the execution side residual expressions will be mined from the passed job conf.
scan = scan.filter(filter).ignoreResiduals();
}
-
return scan;
}
@@ -165,12 +183,19 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
.ofNullable(HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
.orElseGet(() -> Catalogs.loadTable(conf));
- TableScan scan = createTableScan(table, conf);
-
List<InputSplit> splits = Lists.newArrayList();
boolean applyResidual = !conf.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
InputFormatConfig.InMemoryDataModel model = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
+
+ long fromVersion = conf.getLong(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, -1);
+ Scan<?, FileScanTask, CombinedScanTask> scan;
+ if (fromVersion != -1) {
+ scan = applyConfig(conf, createIncrementalAppendScan(table, conf));
+ } else {
+ scan = applyConfig(conf, createTableScan(table, conf));
+ }
+
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
Table serializableTable = SerializableTable.copyOf(table);
tasksIterable.forEach(task -> {
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc2.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc2.q
index 2400c9a6424..f0fd54ffbfe 100644
--- a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc2.q
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc2.q
@@ -1,5 +1,6 @@
-- MV data is stored by iceberg v1
-- SORT_QUERY_RESULTS
+--! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
set hive.explain.user=false;
set hive.support.concurrency=true;
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc4.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc4.q
new file mode 100644
index 00000000000..494ea821a71
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc4.q
@@ -0,0 +1,30 @@
+-- MV source tables are iceberg and MV has aggregate
+-- SORT_QUERY_RESULTS
+--! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
+
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+drop table if exists tbl_ice;
+
+create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1');
+create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2');
+
+insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54);
+insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);
+
+create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c;
+
+-- insert some new values to the source tables
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54);
+insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);
+
+explain cbo
+alter materialized view mat1 rebuild;
+alter materialized view mat1 rebuild;
+
+select * from mat1;
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc5.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc5.q
new file mode 100644
index 00000000000..dd55b918d5b
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc5.q
@@ -0,0 +1,30 @@
+-- MV source tables are iceberg and MV has aggregate. It also has avg which is calculated from sum and count.
+-- SORT_QUERY_RESULTS
+--! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
+
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+drop table if exists tbl_ice;
+
+create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1');
+create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2');
+
+insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54);
+insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);
+
+create materialized view mat2 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f), count(tbl_ice_v2.f), avg(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c;
+
+-- insert some new records to the source tables
+insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54);
+insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);
+
+explain cbo
+alter materialized view mat2 rebuild;
+alter materialized view mat2 rebuild;
+
+select * from mat2;
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q
new file mode 100644
index 00000000000..ff5a113bd0b
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/mv_iceberg_orc6.q
@@ -0,0 +1,30 @@
+-- MV source tables are iceberg and has delete operation
+-- SORT_QUERY_RESULTS
+--! qt:replace:/(.*fromVersion=\[)\S+(\].*)/$1#Masked#$2/
+
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+drop table if exists tbl_ice;
+
+create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1');
+create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2');
+
+insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54);
+insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54);
+
+create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c;
+
+-- delete some records from a source table
+delete from tbl_ice_v2 where d = 4;
+
+-- plan should be insert overwrite
+explain cbo
+alter materialized view mat1 rebuild;
+alter materialized view mat1 rebuild;
+
+select * from mat1;
diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc2.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc2.q.out
index f97a61ac61a..a6fa7ea6276 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc2.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc2.q.out
@@ -237,7 +237,7 @@ POSTHOOK: Output: default@mat1
CBO PLAN:
HiveProject(b=[$1], c=[$2])
HiveFilter(condition=[>($2, 52)])
- HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice])
+ HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice], fromVersion=[#Masked#])
PREHOOK: query: alter materialized view mat1 rebuild
PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc4.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc4.q.out
new file mode 100644
index 00000000000..d3d66befb77
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc4.q.out
@@ -0,0 +1,130 @@
+PREHOOK: query: drop table if exists tbl_ice
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists tbl_ice
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+PREHOOK: type: CREATE_MATERIALIZED_VIEW
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: database:default
+PREHOOK: Output: default@mat1
+POSTHOOK: query: create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+POSTHOOK: type: CREATE_MATERIALIZED_VIEW
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@mat1
+POSTHOOK: Lineage: mat1._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat1.b SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat1.c SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: explain cbo
+alter materialized view mat1 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat1
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat1
+POSTHOOK: query: explain cbo
+alter materialized view mat1 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat1
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat1
+CBO PLAN:
+HiveAggregate(group=[{0, 1}], agg#0=[sum($2)])
+ HiveProject($f0=[$0], $f1=[$1], $f2=[$2])
+ HiveUnion(all=[true])
+ HiveProject(b=[$0], c=[$1], $f2=[$2])
+ HiveAggregate(group=[{1, 2}], agg#0=[sum($4)])
+ HiveJoin(condition=[=($0, $3)], joinType=[inner], algorithm=[none], cost=[not available])
+ HiveProject(a=[$0], b=[$1], c=[$2])
+ HiveFilter(condition=[AND(>($2, 52), IS NOT NULL($0))])
+ HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice], fromVersion=[#Masked#])
+ HiveProject(d=[$0], f=[$2])
+ HiveFilter(condition=[IS NOT NULL($0)])
+ HiveTableScan(table=[[default, tbl_ice_v2]], table:alias=[tbl_ice_v2], fromVersion=[#Masked#])
+ HiveProject(b=[$0], c=[$1], _c2=[$2])
+ HiveTableScan(table=[[default, mat1]], table:alias=[default.mat1])
+
+PREHOOK: query: alter materialized view mat1 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat1
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat1
+POSTHOOK: query: alter materialized view mat1 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat1
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat1
+POSTHOOK: Lineage: mat1._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), (mat1)default.mat1.FieldSchema(name:_c2, type:bigint, comment:null), ]
+POSTHOOK: Lineage: mat1.b EXPRESSION [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), (mat1)default.mat1.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat1.c EXPRESSION [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), (mat1)default.mat1.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: select * from mat1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@mat1
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from mat1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@mat1
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+five 54 108
+four 53 106
diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc5.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc5.q.out
new file mode 100644
index 00000000000..e24319ae71a
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc5.q.out
@@ -0,0 +1,135 @@
+PREHOOK: query: drop table if exists tbl_ice
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists tbl_ice
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: create materialized view mat2 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f), count(tbl_ice_v2.f), avg(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+PREHOOK: type: CREATE_MATERIALIZED_VIEW
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: database:default
+PREHOOK: Output: default@mat2
+POSTHOOK: query: create materialized view mat2 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f), count(tbl_ice_v2.f), avg(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+POSTHOOK: type: CREATE_MATERIALIZED_VIEW
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@mat2
+POSTHOOK: Lineage: mat2._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat2._c3 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat2._c4 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat2.b SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat2.c SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (2, 'two', 51), (3, 'three', 52), (4, 'four', 53), (5, 'five', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: explain cbo
+alter materialized view mat2 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat2
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat2
+POSTHOOK: query: explain cbo
+alter materialized view mat2 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat2
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat2
+CBO PLAN:
+HiveProject(b=[$0], c=[$1], _o__c2=[$2], _o__c3=[COALESCE($3, 0:BIGINT)], _o__c4=[/(CAST($2):DOUBLE, COALESCE($3, 0:BIGINT))])
+ HiveAggregate(group=[{0, 1}], agg#0=[sum($2)], agg#1=[sum($3)])
+ HiveProject($f0=[$0], $f1=[$1], $f2=[$2], $f3=[$3])
+ HiveUnion(all=[true])
+ HiveProject(b=[$0], c=[$1], $f2=[$2], $f3=[$3])
+ HiveAggregate(group=[{1, 2}], agg#0=[sum($4)], agg#1=[count($4)])
+ HiveJoin(condition=[=($0, $3)], joinType=[inner], algorithm=[none], cost=[not available])
+ HiveProject(a=[$0], b=[$1], c=[$2])
+ HiveFilter(condition=[AND(>($2, 52), IS NOT NULL($0))])
+ HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice], fromVersion=[#Masked#])
+ HiveProject(d=[$0], f=[$2])
+ HiveFilter(condition=[IS NOT NULL($0)])
+ HiveTableScan(table=[[default, tbl_ice_v2]], table:alias=[tbl_ice_v2], fromVersion=[#Masked#])
+ HiveProject(b=[$0], c=[$1], _c2=[$2], _c3=[$3])
+ HiveTableScan(table=[[default, mat2]], table:alias=[default.mat2])
+
+PREHOOK: query: alter materialized view mat2 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@mat2
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat2
+POSTHOOK: query: alter materialized view mat2 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@mat2
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat2
+POSTHOOK: Lineage: mat2._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), (mat2)default.mat2.FieldSchema(name:_c2, type:bigint, comment:null), ]
+POSTHOOK: Lineage: mat2._c3 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), (mat2)default.mat2.FieldSchema(name:_c3, type:bigint, comment:null), ]
+POSTHOOK: Lineage: mat2._c4 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), (mat2)default.mat2.FieldSchema(name:_c2, type:bigint, comment:null), (mat2)default.mat2.FieldSchema(name:_c3, type:bigint, comment:null), ]
+POSTHOOK: Lineage: mat2.b EXPRESSION [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), (mat2)default.mat2.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat2.c EXPRESSION [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), (mat2)default.mat2.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: select * from mat2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@mat2
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from mat2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@mat2
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+five 54 108 2 54.0
+four 53 106 2 53.0
diff --git a/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc6.q.out b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc6.q.out
new file mode 100644
index 00000000000..8c38b62f3c9
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/mv_iceberg_orc6.q.out
@@ -0,0 +1,111 @@
+PREHOOK: query: drop table if exists tbl_ice
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists tbl_ice
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: create external table tbl_ice(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='1')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: create external table tbl_ice_v2(d int, e string, f int) stored by iceberg stored as orc tblproperties ('format-version'='2')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice
+POSTHOOK: query: insert into tbl_ice values (1, 'one', 50), (4, 'four', 53), (5, 'five', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice
+PREHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: insert into tbl_ice_v2 values (1, 'one v2', 50), (4, 'four v2', 53), (5, 'five v2', 54)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+PREHOOK: type: CREATE_MATERIALIZED_VIEW
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: database:default
+PREHOOK: Output: default@mat1
+POSTHOOK: query: create materialized view mat1 as
+select tbl_ice.b, tbl_ice.c, sum(tbl_ice_v2.f)
+from tbl_ice
+join tbl_ice_v2 on tbl_ice.a=tbl_ice_v2.d where tbl_ice.c > 52
+group by tbl_ice.b, tbl_ice.c
+POSTHOOK: type: CREATE_MATERIALIZED_VIEW
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@mat1
+POSTHOOK: Lineage: mat1._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat1.b SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat1.c SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: delete from tbl_ice_v2 where d = 4
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@tbl_ice_v2
+POSTHOOK: query: delete from tbl_ice_v2 where d = 4
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@tbl_ice_v2
+PREHOOK: query: explain cbo
+alter materialized view mat1 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat1
+POSTHOOK: query: explain cbo
+alter materialized view mat1 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat1
+CBO PLAN:
+HiveAggregate(group=[{1, 2}], agg#0=[sum($4)])
+ HiveJoin(condition=[=($0, $3)], joinType=[inner], algorithm=[none], cost=[not available])
+ HiveProject(a=[$0], b=[$1], c=[$2])
+ HiveFilter(condition=[AND(>($2, 52), IS NOT NULL($0))])
+ HiveTableScan(table=[[default, tbl_ice]], table:alias=[tbl_ice])
+ HiveProject(d=[$0], f=[$2])
+ HiveFilter(condition=[IS NOT NULL($0)])
+ HiveTableScan(table=[[default, tbl_ice_v2]], table:alias=[tbl_ice_v2])
+
+PREHOOK: query: alter materialized view mat1 rebuild
+PREHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+PREHOOK: Input: default@tbl_ice
+PREHOOK: Input: default@tbl_ice_v2
+PREHOOK: Output: default@mat1
+POSTHOOK: query: alter materialized view mat1 rebuild
+POSTHOOK: type: ALTER_MATERIALIZED_VIEW_REBUILD
+POSTHOOK: Input: default@tbl_ice
+POSTHOOK: Input: default@tbl_ice_v2
+POSTHOOK: Output: default@mat1
+POSTHOOK: Lineage: mat1._c2 EXPRESSION [(tbl_ice_v2)tbl_ice_v2.FieldSchema(name:f, type:int, comment:null), ]
+POSTHOOK: Lineage: mat1.b SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:b, type:string, comment:null), ]
+POSTHOOK: Lineage: mat1.c SIMPLE [(tbl_ice)tbl_ice.FieldSchema(name:c, type:int, comment:null), ]
+PREHOOK: query: select * from mat1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@mat1
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from mat1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@mat1
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+five 54 54
diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/SerializableTable.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/SerializableTable.java
new file mode 100644
index 00000000000..14f5deadb91
--- /dev/null
+++ b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializableMap;
+import org.apache.iceberg.util.SerializableSupplier;
+
+/**
+ * A read-only serializable table that can be sent to other nodes in a cluster.
+ *
+ * <p>An instance of this class represents an immutable serializable copy of a table state and will
+ * not reflect any subsequent changed made to the original table.
+ *
+ * <p>While this class captures the metadata file location that can be used to load the complete
+ * table metadata, it directly persists the current schema, spec, sort order, table properties to
+ * avoid reading the metadata file from other nodes for frequently needed metadata.
+ *
+ * <p>The implementation assumes the passed instances of {@link FileIO}, {@link EncryptionManager},
+ * {@link LocationProvider} are serializable. If you are serializing the table using a custom
+ * serialization framework like Kryo, those instances of {@link FileIO}, {@link EncryptionManager},
+ * {@link LocationProvider} must be supported by that particular serialization framework.
+ *
+ * <p><em>Note:</em> loading the complete metadata from a large number of nodes can overwhelm the
+ * storage.
+ */
+public class SerializableTable implements Table, Serializable {
+
+ private final String name;
+ private final String location;
+ private final String metadataFileLocation;
+ private final Map<String, String> properties;
+ private final String schemaAsJson;
+ private final int defaultSpecId;
+ private final Map<Integer, String> specAsJsonMap;
+ private final String sortOrderAsJson;
+ private final FileIO io;
+ private final EncryptionManager encryption;
+ private final LocationProvider locationProvider;
+ private final Map<String, SnapshotRef> refs;
+
+ private transient volatile Table lazyTable = null;
+ private transient volatile Schema lazySchema = null;
+ private transient volatile Map<Integer, PartitionSpec> lazySpecs = null;
+ private transient volatile SortOrder lazySortOrder = null;
+
+ protected SerializableTable(Table table) {
+ this.name = table.name();
+ this.location = table.location();
+ this.metadataFileLocation = metadataFileLocation(table);
+ this.properties = SerializableMap.copyOf(table.properties());
+ this.schemaAsJson = SchemaParser.toJson(table.schema());
+ this.defaultSpecId = table.spec().specId();
+ this.specAsJsonMap = Maps.newHashMap();
+ Map<Integer, PartitionSpec> specs = table.specs();
+ specs.forEach((specId, spec) -> specAsJsonMap.put(specId, PartitionSpecParser.toJson(spec)));
+ this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
+ this.io = fileIO(table);
+ this.encryption = table.encryption();
+ this.locationProvider = table.locationProvider();
+ this.refs = SerializableMap.copyOf(table.refs());
+ }
+
+ /**
+ * Creates a read-only serializable table that can be sent to other nodes in a cluster.
+ *
+ * @param table the original table to copy the state from
+ * @return a read-only serializable table reflecting the current state of the original table
+ */
+ public static Table copyOf(Table table) {
+ if (table instanceof BaseMetadataTable) {
+ return new SerializableMetadataTable((BaseMetadataTable) table);
+ } else {
+ return new SerializableTable(table);
+ }
+ }
+
+ private String metadataFileLocation(Table table) {
+ if (table instanceof HasTableOperations) {
+ TableOperations ops = ((HasTableOperations) table).operations();
+ return ops.current().metadataFileLocation();
+ } else {
+ return null;
+ }
+ }
+
+ private FileIO fileIO(Table table) {
+ if (table.io() instanceof HadoopConfigurable) {
+ ((HadoopConfigurable) table.io()).serializeConfWith(SerializableConfSupplier::new);
+ }
+
+ return table.io();
+ }
+
+ private Table lazyTable() {
+ if (lazyTable == null) {
+ synchronized (this) {
+ if (lazyTable == null) {
+ if (metadataFileLocation == null) {
+ throw new UnsupportedOperationException(
+ "Cannot load metadata: metadata file location is null");
+ }
+
+ TableOperations ops =
+ new StaticTableOperations(metadataFileLocation, io, locationProvider);
+ this.lazyTable = newTable(ops, name);
+ }
+ }
+ }
+
+ return lazyTable;
+ }
+
+ protected Table newTable(TableOperations ops, String tableName) {
+ return new BaseTable(ops, tableName);
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public String location() {
+ return location;
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return properties;
+ }
+
+ @Override
+ public Schema schema() {
+ if (lazySchema == null) {
+ synchronized (this) {
+ if (lazySchema == null && lazyTable == null) {
+ // prefer parsing JSON as opposed to loading the metadata
+ this.lazySchema = SchemaParser.fromJson(schemaAsJson);
+ } else if (lazySchema == null) {
+ this.lazySchema = lazyTable.schema();
+ }
+ }
+ }
+
+ return lazySchema;
+ }
+
+ @Override
+ public Map<Integer, Schema> schemas() {
+ return lazyTable().schemas();
+ }
+
+ @Override
+ public PartitionSpec spec() {
+ return specs().get(defaultSpecId);
+ }
+
+ @Override
+ public Map<Integer, PartitionSpec> specs() {
+ if (lazySpecs == null) {
+ synchronized (this) {
+ if (lazySpecs == null && lazyTable == null) {
+ // prefer parsing JSON as opposed to loading the metadata
+ Map<Integer, PartitionSpec> specs = Maps.newHashMapWithExpectedSize(specAsJsonMap.size());
+ specAsJsonMap.forEach(
+ (specId, specAsJson) -> {
+ specs.put(specId, PartitionSpecParser.fromJson(schema(), specAsJson));
+ });
+ this.lazySpecs = specs;
+ } else if (lazySpecs == null) {
+ this.lazySpecs = lazyTable.specs();
+ }
+ }
+ }
+
+ return lazySpecs;
+ }
+
+ @Override
+ public SortOrder sortOrder() {
+ if (lazySortOrder == null) {
+ synchronized (this) {
+ if (lazySortOrder == null && lazyTable == null) {
+ // prefer parsing JSON as opposed to loading the metadata
+ this.lazySortOrder = SortOrderParser.fromJson(schema(), sortOrderAsJson);
+ } else if (lazySortOrder == null) {
+ this.lazySortOrder = lazyTable.sortOrder();
+ }
+ }
+ }
+
+ return lazySortOrder;
+ }
+
+ @Override
+ public Map<Integer, SortOrder> sortOrders() {
+ return lazyTable().sortOrders();
+ }
+
+ @Override
+ public FileIO io() {
+ return io;
+ }
+
+ @Override
+ public EncryptionManager encryption() {
+ return encryption;
+ }
+
+ @Override
+ public LocationProvider locationProvider() {
+ return locationProvider;
+ }
+
+ @Override
+ public List<StatisticsFile> statisticsFiles() {
+ return lazyTable().statisticsFiles();
+ }
+
+ @Override
+ public Map<String, SnapshotRef> refs() {
+ return refs;
+ }
+
+ @Override
+ public void refresh() {
+ throw new UnsupportedOperationException(errorMsg("refresh"));
+ }
+
+ @Override
+ public TableScan newScan() {
+ return lazyTable().newScan();
+ }
+
+ public IncrementalAppendScan newIncrementalAppendScan() {
+ return lazyTable().newIncrementalAppendScan();
+ }
+
+ @Override
+ public BatchScan newBatchScan() {
+ return lazyTable().newBatchScan();
+ }
+
+ @Override
+ public Snapshot currentSnapshot() {
+ return lazyTable().currentSnapshot();
+ }
+
+ @Override
+ public Snapshot snapshot(long snapshotId) {
+ return lazyTable().snapshot(snapshotId);
+ }
+
+ @Override
+ public Iterable<Snapshot> snapshots() {
+ return lazyTable().snapshots();
+ }
+
+ @Override
+ public List<HistoryEntry> history() {
+ return lazyTable().history();
+ }
+
+ @Override
+ public UpdateSchema updateSchema() {
+ throw new UnsupportedOperationException(errorMsg("updateSchema"));
+ }
+
+ @Override
+ public UpdatePartitionSpec updateSpec() {
+ throw new UnsupportedOperationException(errorMsg("updateSpec"));
+ }
+
+ @Override
+ public UpdateProperties updateProperties() {
+ throw new UnsupportedOperationException(errorMsg("updateProperties"));
+ }
+
+ @Override
+ public ReplaceSortOrder replaceSortOrder() {
+ throw new UnsupportedOperationException(errorMsg("replaceSortOrder"));
+ }
+
+ @Override
+ public UpdateLocation updateLocation() {
+ throw new UnsupportedOperationException(errorMsg("updateLocation"));
+ }
+
+ @Override
+ public AppendFiles newAppend() {
+ throw new UnsupportedOperationException(errorMsg("newAppend"));
+ }
+
+ @Override
+ public RewriteFiles newRewrite() {
+ throw new UnsupportedOperationException(errorMsg("newRewrite"));
+ }
+
+ @Override
+ public RewriteManifests rewriteManifests() {
+ throw new UnsupportedOperationException(errorMsg("rewriteManifests"));
+ }
+
+ @Override
+ public OverwriteFiles newOverwrite() {
+ throw new UnsupportedOperationException(errorMsg("newOverwrite"));
+ }
+
+ @Override
+ public RowDelta newRowDelta() {
+ throw new UnsupportedOperationException(errorMsg("newRowDelta"));
+ }
+
+ @Override
+ public ReplacePartitions newReplacePartitions() {
+ throw new UnsupportedOperationException(errorMsg("newReplacePartitions"));
+ }
+
+ @Override
+ public DeleteFiles newDelete() {
+ throw new UnsupportedOperationException(errorMsg("newDelete"));
+ }
+
+ @Override
+ public UpdateStatistics updateStatistics() {
+ throw new UnsupportedOperationException(errorMsg("updateStatistics"));
+ }
+
+ @Override
+ public ExpireSnapshots expireSnapshots() {
+ throw new UnsupportedOperationException(errorMsg("expireSnapshots"));
+ }
+
+ @Override
+ public ManageSnapshots manageSnapshots() {
+ throw new UnsupportedOperationException(errorMsg("manageSnapshots"));
+ }
+
+ @Override
+ public Transaction newTransaction() {
+ throw new UnsupportedOperationException(errorMsg("newTransaction"));
+ }
+
+ private String errorMsg(String operation) {
+ return String.format("Operation %s is not supported after the table is serialized", operation);
+ }
+
+ public static class SerializableMetadataTable extends SerializableTable {
+ private final MetadataTableType type;
+ private final String baseTableName;
+
+ protected SerializableMetadataTable(BaseMetadataTable metadataTable) {
+ super(metadataTable);
+ this.type = metadataTable.metadataTableType();
+ this.baseTableName = metadataTable.table().name();
+ }
+
+ @Override
+ protected Table newTable(TableOperations ops, String tableName) {
+ return MetadataTableUtils.createMetadataTableInstance(ops, baseTableName, tableName, type);
+ }
+
+ public MetadataTableType type() {
+ return type;
+ }
+ }
+
+ // captures the current state of a Hadoop configuration in a serializable manner
+ private static class SerializableConfSupplier implements SerializableSupplier<Configuration> {
+
+ private final Map<String, String> confAsMap;
+ private transient volatile Configuration conf = null;
+
+ SerializableConfSupplier(Configuration conf) {
+ this.confAsMap = Maps.newHashMapWithExpectedSize(conf.size());
+ conf.forEach(entry -> confAsMap.put(entry.getKey(), entry.getValue()));
+ }
+
+ @Override
+ public Configuration get() {
+ if (conf == null) {
+ synchronized (this) {
+ if (conf == null) {
+ Configuration newConf = new Configuration(false);
+ confAsMap.forEach(newConf::set);
+ this.conf = newConf;
+ }
+ }
+ }
+
+ return conf;
+ }
+ }
+}
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 1ff486b264d..cd3a74f1dac 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -506,6 +506,7 @@ TOK_TRUNCATE;
TOK_BUCKET;
TOK_AS_OF_TIME;
TOK_AS_OF_VERSION;
+TOK_FROM_VERSION;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
index 53726c93d5f..d6e28e5a685 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/alter/rebuild/AlterMaterializedViewRebuildAnalyzer.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveJoinInsertInc
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializationRelMetadataProvider;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializedViewRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HiveMaterializedViewUtils;
+import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.HivePushdownSnapshotFilterRule;
import org.apache.hadoop.hive.ql.optimizer.calcite.rules.views.MaterializedViewRewritingRelVisitor;
import org.apache.hadoop.hive.ql.optimizer.calcite.stats.HiveIncrementalRelMdRowCount;
import org.apache.hadoop.hive.ql.parse.ASTNode;
@@ -244,6 +245,10 @@ public class AlterMaterializedViewRebuildAnalyzer extends CalcitePlanner {
// Optimize plan
basePlan = executeProgram(basePlan, program.build(), mdProvider, executorProvider, singletonList(materialization));
+ program = new HepProgramBuilder();
+ generatePartialProgram(program, false, HepMatchOrder.DEPTH_FIRST, HivePushdownSnapshotFilterRule.INSTANCE);
+ basePlan = executeProgram(basePlan, program.build(), mdProvider, executorProvider);
+
perfLogger.perfLogEnd(this.getClass().getName(), PerfLogger.OPTIMIZER, "Calcite: View-based rewriting");
List<Table> materializedViewsUsedOriginalPlan = getMaterializedViewsUsed(calcitePreMVRewritingPlan);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 951e20687a0..fee00fa4d2f 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -993,6 +993,10 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
if (scanDesc.getAsOfVersion() != null) {
jobConf.set(TableScanDesc.AS_OF_VERSION, scanDesc.getAsOfVersion());
}
+
+ if (scanDesc.getVersionIntervalFrom() != null) {
+ jobConf.set(TableScanDesc.FROM_VERSION, scanDesc.getVersionIntervalFrom());
+ }
}
protected void pushProjectionsAndFiltersAndAsOf(JobConf jobConf, Path splitPath) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 9e4cdddab8f..5eef538556e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.MaterializationSnapshot;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
@@ -106,6 +107,24 @@ import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.SourceTable;
+import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaException;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.PartitionDropOptions;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.AllTableConstraintsRequest;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
@@ -114,7 +133,6 @@ import org.apache.hadoop.hive.metastore.api.CmRecycleRequest;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -128,7 +146,6 @@ import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
-import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse;
import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
@@ -137,7 +154,6 @@ import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse;
import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
-import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
@@ -147,7 +163,6 @@ import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.Materialization;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
import org.apache.hadoop.hive.metastore.api.PartitionSpec;
import org.apache.hadoop.hive.metastore.api.PartitionWithoutSD;
@@ -170,7 +185,6 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest;
-import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMMapping;
import org.apache.hadoop.hive.metastore.api.WMNullablePool;
@@ -183,18 +197,6 @@ import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
import org.apache.hadoop.hive.metastore.api.WriteNotificationLogBatchRequest;
import org.apache.hadoop.hive.metastore.api.AbortCompactionRequest;
import org.apache.hadoop.hive.metastore.api.AbortCompactResponse;
-import org.apache.hadoop.hive.ql.io.HdfsUtils;
-import org.apache.hadoop.hive.metastore.HiveMetaException;
-import org.apache.hadoop.hive.metastore.HiveMetaHook;
-import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.PartitionDropOptions;
-import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
-import org.apache.hadoop.hive.metastore.SynchronizedMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -2080,8 +2082,7 @@ public class Hive {
} else {
// Obtain additional information if we should try incremental rewriting / rebuild
// We will not try partial rewriting if there were update/delete/compaction operations on source tables
- Materialization invalidationInfo = getMSC().getMaterializationInvalidationInfo(
- materializedViewTable.getMVMetadata().creationMetadata, conf.get(ValidTxnList.VALID_TXNS_KEY));
+ Materialization invalidationInfo = getMaterializationInvalidationInfo(materializedViewTable.getMVMetadata());
if (invalidationInfo == null || invalidationInfo.isSourceTablesUpdateDeleteModified() ||
invalidationInfo.isSourceTablesCompacted()) {
// We ignore (as it did not meet the requirements), but we do not need to update it in the
@@ -2101,6 +2102,55 @@ public class Hive {
}
}
+ private Materialization getMaterializationInvalidationInfo(MaterializedViewMetadata metadata)
+ throws TException, HiveException {
+ Optional<SourceTable> first = metadata.getSourceTables().stream().findFirst();
+ if (!first.isPresent()) {
+ // This is unexpected: all MV must have at least one source
+ Materialization materialization = new Materialization();
+ materialization.setSourceTablesCompacted(true);
+ materialization.setSourceTablesUpdateDeleteModified(true);
+ return new Materialization();
+ } else {
+ Table table = getTable(first.get().getTable().getDbName(), first.get().getTable().getTableName());
+ if (!(table.isNonNative() && table.getStorageHandler().areSnapshotsSupported())) {
+ // Mixing native and non-native acid source tables are not supported. If the first source is native acid
+ // the rest is expected to be native acid
+ return getMSC().getMaterializationInvalidationInfo(
+ metadata.creationMetadata, conf.get(ValidTxnList.VALID_TXNS_KEY));
+ }
+ }
+
+ MaterializationSnapshot mvSnapshot = MaterializationSnapshot.fromJson(metadata.creationMetadata.getValidTxnList());
+
+ boolean hasAppendsOnly = true;
+ for (SourceTable sourceTable : metadata.getSourceTables()) {
+ Table table = getTable(sourceTable.getTable().getDbName(), sourceTable.getTable().getTableName());
+ HiveStorageHandler storageHandler = table.getStorageHandler();
+ if (storageHandler == null) {
+ Materialization materialization = new Materialization();
+ materialization.setSourceTablesCompacted(true);
+ return materialization;
+ }
+ Boolean b = storageHandler.hasAppendsOnly(
+ table, mvSnapshot.getTableSnapshots().get(table.getFullyQualifiedName()));
+ if (b == null) {
+ Materialization materialization = new Materialization();
+ materialization.setSourceTablesCompacted(true);
+ return materialization;
+ } else if (!b) {
+ hasAppendsOnly = false;
+ break;
+ }
+ }
+ Materialization materialization = new Materialization();
+ // TODO: delete operations are not supported yet.
+ // Set setSourceTablesCompacted to false when delete is supported
+ materialization.setSourceTablesCompacted(!hasAppendsOnly);
+ materialization.setSourceTablesUpdateDeleteModified(!hasAppendsOnly);
+ return materialization;
+ }
+
/**
* Get the materialized views that have been enabled for rewriting from the
* metastore. If the materialized view is in the cache, we do not need to
@@ -2178,8 +2228,7 @@ public class Hive {
} else {
// Obtain additional information if we should try incremental rewriting / rebuild
// We will not try partial rewriting if there were update/delete/compaction operations on source tables
- invalidationInfo = getMSC().getMaterializationInvalidationInfo(
- metadata.creationMetadata, conf.get(ValidTxnList.VALID_TXNS_KEY));
+ invalidationInfo = getMaterializationInvalidationInfo(metadata);
ignore = invalidationInfo == null || invalidationInfo.isSourceTablesCompacted();
}
if (ignore) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 44dfae08167..aff2f51cbc1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -552,4 +552,8 @@ public interface HiveStorageHandler extends Configurable {
default void prepareAlterTableEnvironmentContext(AbstractAlterTableDesc alterTableDesc,
EnvironmentContext environmentContext) {
}
+
+ default Boolean hasAppendsOnly(org.apache.hadoop.hive.ql.metadata.Table hmsTable, SnapshotContext since) {
+ return null;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index a9ecad39ace..d422fbf3603 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.CreationMetadata;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -60,7 +59,6 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
@@ -134,6 +132,7 @@ public class Table implements Serializable {
* The version of the table. For Iceberg tables this is the snapshotId.
*/
private String asOfVersion = null;
+ private String versionIntervalFrom = null;
/**
* The version of the table at the given timestamp. The format will be parsed with
@@ -180,6 +179,7 @@ public class Table implements Serializable {
newTab.setAsOfTimestamp(this.asOfTimestamp);
newTab.setAsOfVersion(this.asOfVersion);
+ newTab.setVersionIntervalFrom(this.versionIntervalFrom);
newTab.setMetaTable(this.getMetaTable());
return newTab;
@@ -595,6 +595,9 @@ public class Table implements Serializable {
if (!Objects.equals(asOfVersion, other.asOfVersion)) {
return false;
}
+ if (!Objects.equals(versionIntervalFrom, other.versionIntervalFrom)) {
+ return false;
+ }
return true;
}
@@ -1327,6 +1330,14 @@ public class Table implements Serializable {
this.asOfVersion = asOfVersion;
}
+ public String getVersionIntervalFrom() {
+ return versionIntervalFrom;
+ }
+
+ public void setVersionIntervalFrom(String versionIntervalFrom) {
+ this.versionIntervalFrom = versionIntervalFrom;
+ }
+
public String getAsOfTimestamp() {
return asOfTimestamp;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
index ac075a52c32..8ffd41c49f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/VirtualColumn.java
@@ -58,6 +58,7 @@ public enum VirtualColumn {
PARTITION_HASH("PARTITION__HASH", TypeInfoFactory.longTypeInfo),
FILE_PATH("FILE__PATH", TypeInfoFactory.stringTypeInfo),
ROW_POSITION("ROW__POSITION", TypeInfoFactory.longTypeInfo),
+ SNAPSHOT_ID("SNAPSHOT__ID", TypeInfoFactory.longTypeInfo),
/**
* GROUPINGID is used with GROUP BY GROUPINGS SETS, ROLLUP and CUBE.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java
index dfeab795f26..26ceedac8c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/reloperators/HiveTableScan.java
@@ -50,6 +50,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableSet;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
/**
* Relational expression representing a scan of a HiveDB collection.
@@ -207,7 +209,9 @@ public class HiveTableScan extends TableScan implements HiveRelNode {
.itemIf("plKey", ((RelOptHiveTable) table).getPartitionListKey(), pw.getDetailLevel() == SqlExplainLevel.DIGEST_ATTRIBUTES)
.itemIf("table:alias", tblAlias, !this.useQBIdInDigest)
.itemIf("tableScanTrait", this.tableScanTrait,
- pw.getDetailLevel() == SqlExplainLevel.DIGEST_ATTRIBUTES);
+ pw.getDetailLevel() == SqlExplainLevel.DIGEST_ATTRIBUTES)
+ .itemIf("fromVersion", ((RelOptHiveTable) table).getHiveTableMD().getVersionIntervalFrom(),
+ isNotBlank(((RelOptHiveTable) table).getHiveTableMD().getVersionIntervalFrom()));
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAugmentSnapshotMaterializationRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAugmentSnapshotMaterializationRule.java
new file mode 100644
index 00000000000..26ff609ab6e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveAugmentSnapshotMaterializationRule.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBeans;
+import org.apache.hadoop.hive.common.type.SnapshotContext;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.translator.TypeConverter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This rule will rewrite the materialized view with information about
+ * its invalidation data. In particular, if any of the tables used by the
+ * materialization has been updated since the materialization was created,
+ * it will introduce a filter operator on top of that table in the materialization
+ * definition, making explicit the data contained in it so the rewriting
+ * algorithm can use this information to rewrite the query as a combination of the
+ * outdated materialization data and the new original data in the source tables.
+ * If the data in the source table matches the current data in the snapshot,
+ * no filter is created.
+ * In case of tables supports snapshots the filtering should be performed in the
+ * TableScan operator to read records only from the relevant snapshots.
+ * However, the union rewrite algorithm needs a so-called compensation predicate in
+ * a Filter operator to build the union branch produces the delta records.
+ * After union rewrite algorithm is executed the predicates on SnapshotIds
+ * are pushed down to the corresponding TableScan operator and removed from the Filter
+ * operator. So the reference to the {@link VirtualColumn#SNAPSHOT_ID} is temporary in the
+ * logical plan.
+ *
+ * @see HivePushdownSnapshotFilterRule
+ */
+public class HiveAugmentSnapshotMaterializationRule extends RelRule<HiveAugmentSnapshotMaterializationRule.Config> {
+
+ public static RelOptRule with(Map<String, SnapshotContext> mvMetaStoredSnapshot) {
+ return RelRule.Config.EMPTY.as(HiveAugmentSnapshotMaterializationRule.Config.class)
+ .withMvMetaStoredSnapshot(mvMetaStoredSnapshot)
+ .withRelBuilderFactory(HiveRelFactories.HIVE_BUILDER)
+ .withOperandSupplier(operandBuilder -> operandBuilder.operand(TableScan.class).anyInputs())
+ .withDescription("HiveAugmentSnapshotMaterializationRule")
+ .toRule();
+ }
+
+ public interface Config extends RelRule.Config {
+
+ HiveAugmentSnapshotMaterializationRule.Config withMvMetaStoredSnapshot(
+ Map<String, SnapshotContext> mvMetaStoredSnapshot);
+
+ @ImmutableBeans.Property
+ Map<String, SnapshotContext> getMvMetaStoredSnapshot();
+
+ @Override default HiveAugmentSnapshotMaterializationRule toRule() {
+ return new HiveAugmentSnapshotMaterializationRule(this);
+ }
+ }
+
+ private static RelDataType snapshotIdType = null;
+
+ private static RelDataType snapshotIdType(RelBuilder relBuilder) {
+ if (snapshotIdType == null) {
+ try {
+ snapshotIdType = relBuilder.getTypeFactory().createSqlType(
+ TypeConverter.convert(VirtualColumn.SNAPSHOT_ID.getTypeInfo(),
+ relBuilder.getTypeFactory()).getSqlTypeName());
+ } catch (CalciteSemanticException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return snapshotIdType;
+ }
+
+ private final Set<RelNode> visited;
+ private final Map<String, SnapshotContext> mvMetaStoredSnapshot;
+
+ public HiveAugmentSnapshotMaterializationRule(Config config) {
+ super(config);
+ this.mvMetaStoredSnapshot = config.getMvMetaStoredSnapshot();
+ this.visited = new HashSet<>();
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final TableScan tableScan = call.rel(0);
+ if (!visited.add(tableScan)) {
+ // Already visited
+ return;
+ }
+
+ RelOptHiveTable hiveTable = (RelOptHiveTable) tableScan.getTable();
+ Table table = hiveTable.getHiveTableMD();
+
+ SnapshotContext mvMetaTableSnapshot = mvMetaStoredSnapshot.get(table.getFullyQualifiedName());
+ if (mvMetaTableSnapshot.equals(table.getStorageHandler().getCurrentSnapshotContext(table))) {
+ return;
+ }
+
+ table.setVersionIntervalFrom(Long.toString(mvMetaTableSnapshot.getSnapshotId()));
+
+ RexBuilder rexBuilder = call.builder().getRexBuilder();
+ int snapshotIdIndex = tableScan.getTable().getRowType().getField(
+ VirtualColumn.SNAPSHOT_ID.getName(), false, false).getIndex();
+ RexNode snapshotIdInputRef = rexBuilder.makeInputRef(
+ tableScan.getTable().getRowType().getFieldList().get(snapshotIdIndex).getType(), snapshotIdIndex);
+
+ final RelBuilder relBuilder = call.builder();
+ relBuilder.push(tableScan);
+ List<RexNode> conds = new ArrayList<>();
+ final RexNode literalHighWatermark = rexBuilder.makeLiteral(
+ mvMetaTableSnapshot.getSnapshotId(), snapshotIdType(relBuilder), false);
+ conds.add(
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+ ImmutableList.of(snapshotIdInputRef, literalHighWatermark)));
+ relBuilder.filter(conds);
+ call.transformTo(relBuilder.build());
+ }
+}
\ No newline at end of file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveMaterializedViewUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveMaterializedViewUtils.java
index 4767b08f841..b00067f9fd2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveMaterializedViewUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HiveMaterializedViewUtils.java
@@ -32,6 +32,7 @@ import org.apache.calcite.adapter.druid.DruidQuery;
import org.apache.calcite.interpreter.BindableConvention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptMaterialization;
+import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgramBuilder;
@@ -78,6 +79,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.common.util.TxnIdUtils;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -236,17 +238,26 @@ public class HiveMaterializedViewUtils {
HiveRelOptMaterialization materialization, String validTxnsList,
MaterializationSnapshot snapshot) throws LockException {
+ RelNode modifiedQueryRel;
if (snapshot != null && snapshot.getTableSnapshots() != null && !snapshot.getTableSnapshots().isEmpty()) {
- // Not supported yet for Iceberg tables
- return materialization;
+ modifiedQueryRel = applyRule(
+ materialization.queryRel, HiveAugmentSnapshotMaterializationRule.with(snapshot.getTableSnapshots()));
+ } else {
+ String materializationTxnList = snapshot != null ? snapshot.getValidTxnList() : null;
+ modifiedQueryRel = augmentMaterializationWithTimeInformation(
+ materialization, validTxnsList, new ValidTxnWriteIdList(materializationTxnList));
}
- String materializationTxnList = snapshot != null ? snapshot.getValidTxnList() : null;
- return augmentMaterializationWithTimeInformation(
- materialization, validTxnsList, new ValidTxnWriteIdList(materializationTxnList));
+ return new HiveRelOptMaterialization(materialization.tableRel, modifiedQueryRel,
+ null, materialization.qualifiedTableName, materialization.getScope(), materialization.getRebuildMode(),
+ materialization.getAst());
}
- private static HiveRelOptMaterialization augmentMaterializationWithTimeInformation(
+ /**
+ * Method to enrich the materialization query contained in the input with
+ * its invalidation when materialization has native acid source tables.
+ */
+ private static RelNode augmentMaterializationWithTimeInformation(
HiveRelOptMaterialization materialization, String validTxnsList,
ValidTxnWriteIdList materializationTxnList) throws LockException {
// Extract tables used by the query which will in turn be used to generate
@@ -266,15 +277,22 @@ public class HiveMaterializedViewUtils {
SessionState.get().getTxnMgr().getValidWriteIds(tablesUsed, validTxnsList);
// Augment
final RexBuilder rexBuilder = materialization.queryRel.getCluster().getRexBuilder();
- final HepProgramBuilder augmentMaterializationProgram = new HepProgramBuilder()
- .addRuleInstance(new HiveAugmentMaterializationRule(rexBuilder, currentTxnList, materializationTxnList));
- final HepPlanner augmentMaterializationPlanner = new HepPlanner(
- augmentMaterializationProgram.build());
- augmentMaterializationPlanner.setRoot(materialization.queryRel);
- final RelNode modifiedQueryRel = augmentMaterializationPlanner.findBestExp();
- return new HiveRelOptMaterialization(materialization.tableRel, modifiedQueryRel,
- null, materialization.qualifiedTableName, materialization.getScope(), materialization.getRebuildMode(),
- materialization.getAst());
+ return applyRule(
+ materialization.queryRel, new HiveAugmentMaterializationRule(rexBuilder, currentTxnList, materializationTxnList));
+ }
+
+ /**
+ * Method to apply a rule to a query plan.
+ */
+ @NotNull
+ private static RelNode applyRule(
+ RelNode basePlan, RelOptRule relOptRule) {
+ final HepProgramBuilder programBuilder = new HepProgramBuilder();
+ programBuilder.addRuleInstance(relOptRule);
+ final HepPlanner planner = new HepPlanner(
+ programBuilder.build());
+ planner.setRoot(basePlan);
+ return planner.findBestExp();
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HivePushdownSnapshotFilterRule.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HivePushdownSnapshotFilterRule.java
new file mode 100644
index 00000000000..2de1046d219
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/rules/views/HivePushdownSnapshotFilterRule.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.calcite.rules.views;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexTableInputRef;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.calcite.HiveRelFactories;
+import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable;
+import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveFilter;
+
+import java.util.Set;
+
+/**
+ * Calcite rule to push down predicates contains {@link VirtualColumn#SNAPSHOT_ID} reference to TableScan.
+ * <p>
+ * This rule traverse the logical expression in {@link HiveFilter} operators and search for
+ * predicates like
+ * <p>
+ * <code>
+ * snapshotId <= 12345677899
+ * </code>
+ * <p>
+ * The literal is set in the {@link RelOptHiveTable#getHiveTableMD()} object wrapped by
+ * {@link org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan}
+ * and the original predicate in the {@link HiveFilter} is replaced with literal true.
+ *
+ * @see HiveAugmentSnapshotMaterializationRule
+ */
+public class HivePushdownSnapshotFilterRule extends RelRule<HivePushdownSnapshotFilterRule.Config> {
+
+ public static final RelOptRule INSTANCE =
+ RelRule.Config.EMPTY.as(HivePushdownSnapshotFilterRule.Config.class)
+ .withRelBuilderFactory(HiveRelFactories.HIVE_BUILDER)
+ .withOperandSupplier(operandBuilder -> operandBuilder.operand(HiveFilter.class).anyInputs())
+ .withDescription("HivePushdownSnapshotFilterRule")
+ .toRule();
+
+ public interface Config extends RelRule.Config {
+ @Override
+ default HivePushdownSnapshotFilterRule toRule() {
+ return new HivePushdownSnapshotFilterRule(this);
+ }
+ }
+
+ private HivePushdownSnapshotFilterRule(Config config) {
+ super(config);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ HiveFilter filter = call.rel(0);
+ RexNode newCondition = filter.getCondition().accept(new SnapshotIdShuttle(call.builder().getRexBuilder(), call.getMetadataQuery(), filter));
+ call.transformTo(call.builder().push(filter.getInput()).filter(newCondition).build());
+ }
+
+ static class SnapshotIdShuttle extends RexShuttle {
+
+ private final RexBuilder rexBuilder;
+ private final RelMetadataQuery metadataQuery;
+ private final RelNode startNode;
+
+ public SnapshotIdShuttle(RexBuilder rexBuilder, RelMetadataQuery metadataQuery, RelNode startNode) {
+ this.rexBuilder = rexBuilder;
+ this.metadataQuery = metadataQuery;
+ this.startNode = startNode;
+ }
+
+ @Override
+ public RexNode visitCall(RexCall call) {
+ if (call.operands.size() == 2 &&
+ (setSnapShotId(call.operands.get(0), call.operands.get(1)) ||
+ setSnapShotId(call.operands.get(1), call.operands.get(0)))) {
+ return rexBuilder.makeLiteral(true);
+ }
+
+ return super.visitCall(call);
+ }
+
+ private boolean setSnapShotId(RexNode op1, RexNode op2) {
+ if (!op1.isA(SqlKind.LITERAL)) {
+ return false;
+ }
+
+ RexLiteral literal = (RexLiteral) op1;
+ if (literal.getType().getSqlTypeName().getFamily() != SqlTypeFamily.NUMERIC) {
+ return false;
+ }
+
+ long snapshotId = literal.getValueAs(Long.class);
+
+ RelOptTable relOptTable = getRelOptTableOf(op2);
+ if (relOptTable == null) {
+ return false;
+ }
+
+ RelOptHiveTable hiveTable = (RelOptHiveTable) relOptTable;
+ hiveTable.getHiveTableMD().setVersionIntervalFrom(Long.toString(snapshotId));
+ return true;
+ }
+
+ private RelOptTable getRelOptTableOf(RexNode rexNode) {
+ if (!(rexNode instanceof RexInputRef)) {
+ return null;
+ }
+
+ RexInputRef rexInputRef = (RexInputRef) rexNode;
+ Set<RexNode> rexNodeSet = metadataQuery.getExpressionLineage(startNode, rexInputRef);
+ if (rexNodeSet == null || rexNodeSet.size() != 1) {
+ return null;
+ }
+
+ RexNode resultRexNode = rexNodeSet.iterator().next();
+ if (!(resultRexNode instanceof RexTableInputRef)) {
+ return null;
+ }
+ RexTableInputRef tableInputRef = (RexTableInputRef) resultRexNode;
+
+ RelOptTable relOptTable = tableInputRef.getTableRef().getTable();
+ RelDataTypeField snapshotIdField = relOptTable.getRowType().getField(
+ VirtualColumn.SNAPSHOT_ID.getName(), false, false);
+ if (snapshotIdField == null) {
+ return null;
+ }
+
+ return snapshotIdField.getIndex() == tableInputRef.getIndex() ? relOptTable : null;
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
index 39ebf61d1a0..d8beb0cdb89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java
@@ -99,6 +99,12 @@ public class ASTBuilder {
b.add(asOfBuilder);
}
+ if (hTbl.getHiveTableMD().getVersionIntervalFrom() != null) {
+ ASTBuilder asOfBuilder = ASTBuilder.construct(HiveParser.TOK_FROM_VERSION, "TOK_FROM_VERSION")
+ .add(HiveParser.Number, hTbl.getHiveTableMD().getVersionIntervalFrom());
+ b.add(asOfBuilder);
+ }
+
ASTBuilder propList = ASTBuilder.construct(HiveParser.TOK_TABLEPROPLIST, "TOK_TABLEPROPLIST");
if (scan instanceof DruidQuery) {
//Passing query spec, column names and column types to be used as part of Hive Physical execution
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index f5d9a8d3cdd..36fd7e471c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -572,6 +572,7 @@ public abstract class BaseSemanticAnalyzer {
int ssampleIndex = -1;
int asOfTimeIndex = -1;
int asOfVersionIndex = -1;
+ int asOfVersionFromIndex = -1;
for (int index = 1; index < tabref.getChildCount(); index++) {
ASTNode ct = (ASTNode) tabref.getChild(index);
if (ct.getToken().getType() == HiveParser.TOK_TABLEBUCKETSAMPLE) {
@@ -584,11 +585,14 @@ public abstract class BaseSemanticAnalyzer {
asOfTimeIndex = index;
} else if (ct.getToken().getType() == HiveParser.TOK_AS_OF_VERSION) {
asOfVersionIndex = index;
+ } else if (ct.getToken().getType() == HiveParser.TOK_FROM_VERSION) {
+ asOfVersionFromIndex = index;
} else {
aliasIndex = index;
}
}
- return new int[] {aliasIndex, propsIndex, tsampleIndex, ssampleIndex, asOfTimeIndex, asOfVersionIndex};
+ return new int[] {
+ aliasIndex, propsIndex, tsampleIndex, ssampleIndex, asOfTimeIndex, asOfVersionIndex, asOfVersionFromIndex};
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index ac69a5e7595..d8850ffbafb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -353,6 +353,7 @@ import java.util.stream.IntStream;
import javax.sql.DataSource;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.hadoop.hive.ql.optimizer.calcite.HiveMaterializedViewASTSubQueryRewriteShuttle.getMaterializedViewByAST;
import static org.apache.hadoop.hive.ql.metadata.HiveRelOptMaterialization.RewriteAlgorithm.ANY;
@@ -2947,6 +2948,10 @@ public class CalcitePlanner extends SemanticAnalyzer {
if (AcidUtils.isNonNativeAcidTable(tabMetaData, false)) {
virtualCols.addAll(tabMetaData.getStorageHandler().acidVirtualColumns());
}
+ if (tabMetaData.isNonNative() && tabMetaData.getStorageHandler().areSnapshotsSupported() &&
+ isBlank(tabMetaData.getMetaTable())) {
+ virtualCols.add(VirtualColumn.SNAPSHOT_ID);
+ }
for (VirtualColumn vc : virtualCols) {
colInfo = new ColumnInfo(vc.getName(), vc.getTypeInfo(), tableAlias, true,
vc.getIsHidden());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
index 9caa0e7fb28..ca347878220 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
@@ -28,7 +28,6 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
import org.apache.hadoop.hive.ql.ddl.view.create.CreateMaterializedViewDesc;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -51,7 +50,7 @@ public class QB {
private HashMap<String, QBExpr> aliasToSubqExpr;
private HashMap<String, Table> viewAliasToViewSchema;
private HashMap<String, Map<String, String>> aliasToProps;
- private HashMap<String, Pair<String, String>> aliasToAsOf;
+ private HashMap<String, QBSystemVersion> aliasToSystemVersion;
private List<String> aliases;
private QBParseInfo qbp;
private QBMetaData qbm;
@@ -134,7 +133,7 @@ public class QB {
destToWindowingSpec = new LinkedHashMap<String, WindowingSpec>();
id = getAppendedAliasFromId(outer_id, alias);
aliasInsideView = new HashSet<>();
- aliasToAsOf = new LinkedHashMap<>();
+ aliasToSystemVersion = new LinkedHashMap<>();
}
// For sub-queries, the id. and alias should be appended since same aliases can be re-used
@@ -195,8 +194,8 @@ public class QB {
aliasToProps.put(alias.toLowerCase(), props);
}
- public void setAsOf(String alias, Pair<String, String> asOf) {
- aliasToAsOf.put(alias.toLowerCase(), asOf);
+ public void setSystemVersion(String alias, QBSystemVersion asOf) {
+ aliasToSystemVersion.put(alias.toLowerCase(), asOf);
}
public void addAlias(String alias) {
@@ -257,8 +256,8 @@ public class QB {
return aliasToProps.get(alias.toLowerCase());
}
- public Pair<String, String> getAsOfForAlias(String alias) {
- return aliasToAsOf.get(alias.toLowerCase());
+ public QBSystemVersion getSystemVersionForAlias(String alias) {
+ return aliasToSystemVersion.get(alias.toLowerCase());
}
public void rewriteViewToSubq(String alias, String viewName, QBExpr qbexpr, Table tab) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSystemVersion.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSystemVersion.java
new file mode 100644
index 00000000000..e579e936fb3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSystemVersion.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+public class QBSystemVersion {
+ private final String asOfVersion;
+ private final String fromVersion;
+ private final String asOfTime;
+
+ public QBSystemVersion(String asOfVersion, String fromVersion, String asOfTime) {
+ this.asOfVersion = asOfVersion;
+ this.fromVersion = fromVersion;
+ this.asOfTime = asOfTime;
+ }
+
+ public String getAsOfTime() {
+ return asOfTime;
+ }
+
+ public String getAsOfVersion() {
+ return asOfVersion;
+ }
+
+ public String getFromVersion() {
+ return fromVersion;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 067c35dac40..0b51b46b1c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -1112,6 +1112,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
int ssampleIndex = indexes[3];
int asOfTimeIndex = indexes[4];
int asOfVersionIndex = indexes[5];
+ int asOfVersionFromIndex = indexes[6];
ASTNode tableTree = (ASTNode) (tabref.getChild(0));
@@ -1129,8 +1130,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
qb.setTabProps(alias, props);
}
- if (asOfTimeIndex != -1 || asOfVersionIndex != -1) {
+ if (asOfTimeIndex != -1 || asOfVersionIndex != -1 || asOfVersionFromIndex != -1) {
String asOfVersion = asOfVersionIndex == -1 ? null : tabref.getChild(asOfVersionIndex).getChild(0).getText();
+ String asOfVersionFrom =
+ asOfVersionFromIndex == -1 ? null : tabref.getChild(asOfVersionFromIndex).getChild(0).getText();
String asOfTime = null;
if (asOfTimeIndex != -1) {
@@ -1143,8 +1146,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
asOfTime = stripQuotes(expr.getText());
}
}
- Pair<String, String> asOf = Pair.of(asOfVersion, asOfTime);
- qb.setAsOf(alias, asOf);
+ qb.setSystemVersion(alias, new QBSystemVersion(asOfVersion, asOfVersionFrom, asOfTime));
}
// If the alias is already there then we have a conflict
@@ -2268,13 +2270,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- Pair<String, String> asOf = qb.getAsOfForAlias(alias);
+ QBSystemVersion asOf = qb.getSystemVersionForAlias(alias);
if (asOf != null) {
if (!Optional.ofNullable(tab.getStorageHandler()).map(HiveStorageHandler::isTimeTravelAllowed).orElse(false)) {
throw new SemanticException(ErrorMsg.TIME_TRAVEL_NOT_ALLOWED, alias);
}
- tab.setAsOfVersion(asOf.getLeft());
- tab.setAsOfTimestamp(asOf.getRight());
+ tab.setAsOfVersion(asOf.getAsOfVersion());
+ tab.setVersionIntervalFrom(asOf.getFromVersion());
+ tab.setAsOfTimestamp(asOf.getAsOfTime());
}
if (tab.isView()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 972a85ed454..9c45457b9e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -34,12 +34,9 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.mapred.TextInputFormat;
import java.io.Serializable;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -115,6 +112,9 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
public static final String AS_OF_VERSION =
"hive.io.as.of.version";
+ public static final String FROM_VERSION =
+ "hive.io.version.from";
+
// input file name (big) to bucket number
private Map<String, Integer> bucketFileNameMapping;
@@ -140,6 +140,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
private int numBuckets = -1;
private String asOfVersion = null;
+ private String versionIntervalFrom = null;
private String asOfTimestamp = null;
@@ -172,6 +173,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
numBuckets = tblMetadata.getNumBuckets();
asOfTimestamp = tblMetadata.getAsOfTimestamp();
asOfVersion = tblMetadata.getAsOfVersion();
+ versionIntervalFrom = tblMetadata.getVersionIntervalFrom();
}
isTranscationalTable = AcidUtils.isTransactionalTable(this.tableMetadata);
if (isTranscationalTable) {
@@ -531,6 +533,11 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD
return asOfVersion;
}
+ @Explain(displayName = "Version interval from")
+ public String getVersionIntervalFrom() {
+ return versionIntervalFrom;
+ }
+
@Explain(displayName = "As of timestamp")
public String getAsOfTimestamp() {
return asOfTimestamp;
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java b/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java
index 40daba97e6b..7d2080fb917 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/type/SnapshotContext.java
@@ -63,4 +63,11 @@ public class SnapshotContext {
public int hashCode() {
return Objects.hash(snapshotId);
}
+
+ @Override
+ public String toString() {
+ return "SnapshotContext{" +
+ "snapshotId=" + snapshotId +
+ '}';
+ }
}