You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2022/08/08 12:42:21 UTC

[hive] branch master updated: HIVE-26115: LLAP cache utilization for Iceberg Parquet files (#3480) (Adam Szita, reviewed by Laszlo Pinter)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new fa111f15008 HIVE-26115: LLAP cache utilization for Iceberg Parquet files (#3480) (Adam Szita, reviewed by Laszlo Pinter)
fa111f15008 is described below

commit fa111f150086f0069e3a6a5b8e6f34a4afae3b1b
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Mon Aug 8 14:42:16 2022 +0200

    HIVE-26115: LLAP cache utilization for Iceberg Parquet files (#3480) (Adam Szita, reviewed by Laszlo Pinter)
---
 .../mr/hive/vector/HiveVectorizedReader.java       |  22 +-
 .../parquet}/ParquetFooterInputFromCache.java      | 122 +++---
 .../queries/positive/llap_iceberg_read_parquet.q   | 122 ++++++
 .../positive/llap/llap_iceberg_read_parquet.q.out  | 465 +++++++++++++++++++++
 .../test/resources/testconfiguration.properties    |   4 +-
 .../org/apache/hadoop/hive/llap/io/api/LlapIo.java |  17 +
 .../hadoop/hive/llap/io/api/impl/LlapIoImpl.java   |  58 +++
 .../hive/llap/io/encoded/LlapOrcCacheLoader.java   |   4 +-
 .../hive/llap/io/encoded/OrcEncodedDataReader.java |  18 +-
 .../llap/io/encoded/SerDeEncodedDataReader.java    |  11 +-
 .../org/apache/hadoop/hive/llap/LlapHiveUtils.java |  30 +-
 .../apache/hadoop/hive/ql/io/SyntheticFileId.java  |  29 ++
 .../vector/ParquetFooterInputFromCache.java        |   4 +-
 .../vector/VectorizedParquetRecordReader.java      |  57 +--
 14 files changed, 839 insertions(+), 124 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 00b9b3c73f0..6e23dff92c6 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -42,7 +43,9 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hive.iceberg.org.apache.orc.OrcConf;
+import org.apache.hive.iceberg.org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.hive.iceberg.org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.hive.iceberg.org.apache.parquet.schema.MessageType;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
@@ -53,6 +56,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
 import org.apache.iceberg.orc.VectorizedReadUtils;
+import org.apache.iceberg.parquet.ParquetFooterInputFromCache;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -125,6 +129,7 @@ public class HiveVectorizedReader {
       // TODO: Iceberg currently does not track the last modification time of a file. Until that's added,
       // we need to set Long.MIN_VALUE as last modification time in the fileId triplet.
       SyntheticFileId fileId = new SyntheticFileId(path, task.file().fileSizeInBytes(), Long.MIN_VALUE);
+      fileId.toJobConf(job);
       RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
 
       switch (format) {
@@ -133,7 +138,7 @@ public class HiveVectorizedReader {
           break;
 
         case PARQUET:
-          recordReader = parquetRecordReader(job, reporter, task, path, start, length);
+          recordReader = parquetRecordReader(job, reporter, task, path, start, length, fileId);
           break;
         default:
           throw new UnsupportedOperationException("Vectorized Hive reading unimplemented for format: " + format);
@@ -182,11 +187,22 @@ public class HiveVectorizedReader {
   }
 
   private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReader(JobConf job, Reporter reporter,
-      FileScanTask task, Path path, long start, long length) throws IOException {
+      FileScanTask task, Path path, long start, long length, SyntheticFileId fileId) throws IOException {
     InputSplit split = new FileSplit(path, start, length, job);
     VectorizedParquetInputFormat inputFormat = new VectorizedParquetInputFormat();
 
-    MessageType fileSchema = ParquetFileReader.readFooter(job, path).getFileMetaData().getSchema();
+    MemoryBufferOrBuffers footerData = null;
+    if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
+        LlapProxy.getIo() != null) {
+      LlapProxy.getIo().initCacheOnlyInputFormat(inputFormat);
+      footerData = LlapProxy.getIo().getParquetFooterBuffersFromCache(path, job, fileId);
+    }
+
+    ParquetMetadata parquetMetadata = footerData != null ?
+        ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), ParquetMetadataConverter.NO_FILTER) :
+        ParquetFileReader.readFooter(job, path);
+
+    MessageType fileSchema = parquetMetadata.getFileMetaData().getSchema();
     MessageType typeWithIds = null;
     Schema expectedSchema = task.spec().schema();
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
similarity index 59%
copy from ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
copy to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
index e2e60670cae..bd4bd74106f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
@@ -1,42 +1,52 @@
 /*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hadoop.hive.ql.io.parquet.vector;
+package org.apache.iceberg.parquet;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.io.InputFile;
-import org.apache.parquet.io.SeekableInputStream;
+import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.hive.iceberg.org.apache.parquet.io.InputFile;
+import org.apache.hive.iceberg.org.apache.parquet.io.SeekableInputStream;
 
 /**
+ * Copy of ParquetFooterInputFromCache from hive-exec module to switch dependent Parquet packages
+ * to the shaded version (org.apache.hive.iceberg.org.apache.parquet.io...)
+ *
  * The Parquet InputFile implementation that allows the reader to
  * read the footer from cache without being aware of the latter.
  * This implements both InputFile and the InputStream that the reader gets from InputFile.
  */
-final class ParquetFooterInputFromCache
+public final class ParquetFooterInputFromCache
     extends SeekableInputStream implements InputFile {
-  final static int FOOTER_LENGTH_SIZE = 4; // For the file size check.
+  public static final int FOOTER_LENGTH_SIZE = 4; // For the file size check.
   private static final int TAIL_LENGTH = ParquetFileWriter.MAGIC.length + FOOTER_LENGTH_SIZE;
   private static final int FAKE_PREFIX_LENGTH = ParquetFileWriter.MAGIC.length;
-  private final int length, footerLength;
-  private int position = 0, bufferIx = 0, bufferPos = 0;
+  private final int length;
+  private final int footerLength;
+  private int position = 0;
+  private int bufferIx = 0;
+  private int bufferPos = 0;
   private final MemoryBuffer[] cacheData;
   private final int[] positions;
 
@@ -50,18 +60,18 @@ final class ParquetFooterInputFromCache
       cacheData = new MemoryBuffer[bufs.length + 1];
       System.arraycopy(bufs, 0, cacheData, 0, bufs.length);
     }
-    int footerLength = 0;
+    int footerLen = 0;
     positions = new int[cacheData.length];
     for (int i = 0; i < cacheData.length - 1; ++i) {
-      positions[i] = footerLength;
+      positions[i] = footerLen;
       int dataLen = cacheData[i].getByteBufferRaw().remaining();
       assert dataLen > 0;
-      footerLength += dataLen;
+      footerLen += dataLen;
     }
-    positions[cacheData.length - 1] = footerLength;
-    cacheData[cacheData.length - 1] = new FooterEndBuffer(footerLength);
-    this.footerLength = footerLength;
-    this.length = footerLength + FAKE_PREFIX_LENGTH + TAIL_LENGTH;
+    positions[cacheData.length - 1] = footerLen;
+    cacheData[cacheData.length - 1] = new FooterEndBuffer(footerLen);
+    this.footerLength = footerLen;
+    this.length = footerLen + FAKE_PREFIX_LENGTH + TAIL_LENGTH;
   }
 
   @Override
@@ -71,7 +81,7 @@ final class ParquetFooterInputFromCache
 
   @Override
   public SeekableInputStream newStream() throws IOException {
-    // Note: this doesn't maintain proper newStream semantics (if any). 
+    // Note: this doesn't maintain proper newStream semantics (if any).
     //       We could either clone this instead or enforce that this is only called once.
     return this;
   }
@@ -82,9 +92,9 @@ final class ParquetFooterInputFromCache
   }
 
   @Override
-  public void seek(long targetPos) throws IOException {
-    this.position = (int)targetPos;
-    targetPos -= FAKE_PREFIX_LENGTH;
+  public void seek(long pos) throws IOException {
+    this.position = (int) pos;
+    long targetPos = pos - FAKE_PREFIX_LENGTH;
     // Not efficient, but we don't expect this to be called frequently.
     for (int i = 1; i <= positions.length; ++i) {
       int endPos = (i == positions.length) ? (length - FAKE_PREFIX_LENGTH) : positions[i];
@@ -94,25 +104,32 @@ final class ParquetFooterInputFromCache
         return;
       }
     }
-    throw new IOException("Incorrect seek " + targetPos + "; footer length " + footerLength
-        + Arrays.toString(positions));
+    throw new IOException("Incorrect seek " + targetPos + "; footer length " + footerLength +
+        Arrays.toString(positions));
   }
 
   @Override
   public void readFully(byte[] b, int offset, int len) throws IOException {
-    if (readInternal(b, offset, len) == len) return;
+    if (readInternal(b, offset, len) == len) {
+      return;
+    }
     throw new EOFException();
   }
 
-  public int readInternal(byte[] b, int offset, int len) {
-    if (position >= length) return -1;
-    int argPos = offset, argEnd = offset + len;
+  public int readInternal(byte[] bytes, int offset, int len) {
+    if (position >= length) {
+      return -1;
+    }
+    int argPos = offset;
+    int argEnd = offset + len;
     while (argPos < argEnd) {
-      if (bufferIx == cacheData.length) return (argPos - offset);
+      if (bufferIx == cacheData.length) {
+        return argPos - offset;
+      }
       ByteBuffer data = cacheData[bufferIx].getByteBufferDup();
       int toConsume = Math.min(argEnd - argPos, data.remaining() - bufferPos);
       data.position(data.position() + bufferPos);
-      data.get(b, argPos, toConsume);
+      data.get(bytes, argPos, toConsume);
       if (data.remaining() == 0) {
         ++bufferIx;
         bufferPos = 0;
@@ -126,7 +143,9 @@ final class ParquetFooterInputFromCache
 
   @Override
   public int read() throws IOException {
-    if (position >= length) return -1;
+    if (position >= length) {
+      return -1;
+    }
     ++position;
     ByteBuffer data = cacheData[bufferIx].getByteBufferRaw();
     int bp = bufferPos;
@@ -148,9 +167,9 @@ final class ParquetFooterInputFromCache
         bb.position(bb.position() + result);
       }
     } else {
-      byte[] b = new byte[bb.remaining()];
-      result = readInternal(b, 0, bb.remaining());
-      bb.put(b, 0, result);
+      byte[] bytes = new byte[bb.remaining()];
+      result = readInternal(bytes, 0, bb.remaining());
+      bb.put(bytes, 0, result);
     }
     return result;
   }
@@ -169,18 +188,19 @@ final class ParquetFooterInputFromCache
    * The fake buffer that emulates end of file, with footer length and magic. Given that these
    * can be generated based on the footer buffer itself, we don't cache them.
    */
-  private final static class FooterEndBuffer implements MemoryBuffer {
+  private static final class FooterEndBuffer implements MemoryBuffer {
     private final ByteBuffer bb;
-    public FooterEndBuffer(int footerLength) {
-      byte[] b = new byte[8];
-      b[0] = (byte) ((footerLength >>>  0) & 0xFF);
-      b[1] = (byte) ((footerLength >>>  8) & 0xFF);
-      b[2] = (byte) ((footerLength >>> 16) & 0xFF);
-      b[3] = (byte) ((footerLength >>> 24) & 0xFF);
+
+    FooterEndBuffer(int footerLength) {
+      byte[] bytes = new byte[8];
+      bytes[0] = (byte) ((footerLength >>>  0) & 0xFF);
+      bytes[1] = (byte) ((footerLength >>>  8) & 0xFF);
+      bytes[2] = (byte) ((footerLength >>> 16) & 0xFF);
+      bytes[3] = (byte) ((footerLength >>> 24) & 0xFF);
       for (int i = 0; i < ParquetFileWriter.MAGIC.length; ++i) {
-        b[4 + i] = ParquetFileWriter.MAGIC[i];
+        bytes[4 + i] = ParquetFileWriter.MAGIC[i];
       }
-      bb = ByteBuffer.wrap(b);
+      bb = ByteBuffer.wrap(bytes);
     }
 
     @Override
@@ -193,4 +213,4 @@ final class ParquetFooterInputFromCache
       return bb.duplicate();
     }
   }
-}
\ No newline at end of file
+}
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_parquet.q b/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_parquet.q
new file mode 100644
index 00000000000..d961dc74d6b
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_parquet.q
@@ -0,0 +1,122 @@
+--test against vectorized LLAP execution mode
+set hive.llap.io.enabled=true;
+set hive.vectorized.execution.enabled=true;
+
+DROP TABLE IF EXISTS llap_orders_parquet PURGE;
+DROP TABLE IF EXISTS llap_items_parquet PURGE;
+DROP TABLE IF EXISTS mig_source_parquet PURGE;
+
+
+CREATE EXTERNAL TABLE llap_items_parquet (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS PARQUET;
+INSERT INTO llap_items_parquet VALUES
+(0, 35000,  'Sedan',     'Model 3', 'Standard range plus'),
+(1, 45000,  'Sedan',     'Model 3', 'Long range'),
+(2, 50000,  'Sedan',     'Model 3', 'Performance'),
+(3, 48000,  'Crossover', 'Model Y', 'Long range'),
+(4, 55000,  'Crossover', 'Model Y', 'Performance'),
+(5, 83000,  'Sports',    'Model S', 'Long range'),
+(6, 123000, 'Sports',   'Model S', 'Plaid');
+
+CREATE EXTERNAL TABLE llap_orders_parquet (orderid INT, quantity INT, itemid INT, tradets TIMESTAMP) PARTITIONED BY (p1 STRING, p2 STRING) STORED BY ICEBERG STORED AS PARQUET;
+INSERT INTO llap_orders_parquet VALUES
+(0, 48, 5, timestamp('2000-06-04 19:55:46.129'), 'EU', 'DE'),
+(1, 12, 6, timestamp('2007-06-24 19:23:22.829'), 'US', 'TX'),
+(2, 76, 4, timestamp('2018-02-19 23:43:51.995'), 'EU', 'DE'),
+(3, 91, 5, timestamp('2000-07-15 09:09:11.587'), 'US', 'NJ'),
+(4, 18, 6, timestamp('2007-12-02 22:30:39.302'), 'EU', 'ES'),
+(5, 71, 5, timestamp('2010-02-08 20:31:23.430'), 'EU', 'DE'),
+(6, 78, 3, timestamp('2016-02-22 20:37:37.025'), 'EU', 'FR'),
+(7, 88, 0, timestamp('2020-03-26 18:47:40.611'), 'EU', 'FR'),
+(8, 87, 4, timestamp('2003-02-20 00:48:09.139'), 'EU', 'ES'),
+(9, 60, 6, timestamp('2012-08-28 01:35:54.283'), 'EU', 'IT'),
+(10, 24, 5, timestamp('2015-03-28 18:57:50.069'), 'US', 'NY'),
+(11, 42, 2, timestamp('2012-06-27 01:13:32.350'), 'EU', 'UK'),
+(12, 37, 4, timestamp('2020-08-09 01:18:50.153'), 'US', 'NY'),
+(13, 52, 1, timestamp('2019-09-04 01:46:19.558'), 'EU', 'UK'),
+(14, 96, 3, timestamp('2019-03-05 22:00:03.020'), 'US', 'NJ'),
+(15, 18, 3, timestamp('2001-09-11 00:14:12.687'), 'EU', 'FR'),
+(16, 46, 0, timestamp('2013-08-31 02:16:17.878'), 'EU', 'UK'),
+(17, 26, 5, timestamp('2001-02-01 20:05:32.317'), 'EU', 'FR'),
+(18, 68, 5, timestamp('2009-12-29 08:44:08.048'), 'EU', 'ES'),
+(19, 54, 6, timestamp('2015-08-15 01:59:22.177'), 'EU', 'HU'),
+(20, 10, 0, timestamp('2018-05-06 12:56:12.789'), 'US', 'CA');
+
+--select query without any schema change yet
+SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE p1 = 'EU' and i.price >= 50000 GROUP BY i.name, i.description;
+
+
+--schema evolution on unpartitioned table
+--renames and reorders
+ALTER TABLE llap_items_parquet CHANGE category cat string AFTER description;
+ALTER TABLE llap_items_parquet CHANGE price cost int AFTER name;
+SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE p1 = 'EU' and i.cost >= 100000 GROUP BY i.name, i.description;
+
+--adding a column
+ALTER TABLE llap_items_parquet ADD COLUMNS (to60 float);
+INSERT INTO llap_items_parquet VALUES
+(7, 'Model X', 93000, 'Long range', 'SUV', 3.8),
+(7, 'Model X', 113000, 'Plaid', 'SUV', 2.5);
+SELECT cat, min(to60) from llap_items_parquet group by cat;
+
+--removing a column
+ALTER TABLE llap_items_parquet REPLACE COLUMNS (itemid int, name string, cost int, description string, to60 float);
+INSERT INTO llap_items_parquet VALUES
+(8, 'Cybertruck', 40000, 'Single Motor RWD', 6.5),
+(9, 'Cybertruck', 50000, 'Dual Motor AWD', 4.5);
+SELECT name, min(to60), max(cost) FROM llap_items_parquet WHERE itemid > 3 GROUP BY name;
+
+
+--schema evolution on partitioned table (including partition changes)
+--renames and reorders
+ALTER TABLE llap_orders_parquet CHANGE tradets ordertime timestamp AFTER p2;
+ALTER TABLE llap_orders_parquet CHANGE p1 region string;
+INSERT INTO llap_orders_parquet VALUES
+(21, 21, 8, 'EU', 'HU', timestamp('2000-01-04 19:55:46.129'));
+SELECT region, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region;
+
+ALTER TABLE llap_orders_parquet CHANGE p2 state string;
+SELECT region, state, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region, state;
+
+--adding new column
+ALTER TABLE llap_orders_parquet ADD COLUMNS (city string);
+INSERT INTO llap_orders_parquet VALUES
+(22, 99, 9, 'EU', 'DE', timestamp('2021-01-04 19:55:46.129'), 'München');
+SELECT state, max(city) from llap_orders_parquet WHERE region = 'EU' GROUP BY state;
+
+--making it a partition column
+ALTER TABLE llap_orders_parquet SET PARTITION SPEC (region, state, city);
+INSERT INTO llap_orders_parquet VALUES
+(23, 89, 6, 'EU', 'IT', timestamp('2021-02-04 19:55:46.129'), 'Venezia');
+SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state;
+
+--de-partitioning a column
+ALTER TABLE llap_orders_parquet SET PARTITION SPEC (state, city);
+INSERT INTO llap_orders_parquet VALUES
+(24, 88, 5, 'EU', 'UK', timestamp('2006-02-04 19:55:46.129'), 'London');
+SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state;
+
+--removing a column from schema
+ALTER TABLE llap_orders_parquet REPLACE COLUMNS (quantity int, itemid int, region string, state string, ordertime timestamp, city string);
+INSERT INTO llap_orders_parquet VALUES
+(88, 5, 'EU', 'FR', timestamp('2006-02-04 19:55:46.129'), 'Paris');
+SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state;
+
+
+--some more projections
+SELECT o.city, i.name, min(i.cost), max(to60), sum(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE region = 'EU' and i.cost >= 50000 and ordertime > timestamp('2010-01-01') GROUP BY o.city, i.name;
+SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description;
+
+---------------------------------------------
+--Test migrated partitioned table gets cached
+
+CREATE EXTERNAL TABLE mig_source_parquet (id int) partitioned by (region string) stored AS PARQUET;
+INSERT INTO mig_source_parquet VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US');
+ALTER TABLE mig_source_parquet SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
+
+-- Should miss, but fill cache
+SELECT region, SUM(id) from mig_source_parquet GROUP BY region;
+
+-- Should hit cache
+set hive.llap.io.cache.only=true;
+SELECT region, SUM(id) from mig_source_parquet GROUP BY region;
+set hive.llap.io.cache.only=false;
diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_parquet.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_parquet.q.out
new file mode 100644
index 00000000000..a7ae1a2c34d
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_parquet.q.out
@@ -0,0 +1,465 @@
+PREHOOK: query: DROP TABLE IF EXISTS llap_orders_parquet PURGE
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS llap_orders_parquet PURGE
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS llap_items_parquet PURGE
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS llap_items_parquet PURGE
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS mig_source_parquet PURGE
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS mig_source_parquet PURGE
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE EXTERNAL TABLE llap_items_parquet (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: CREATE EXTERNAL TABLE llap_items_parquet (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: INSERT INTO llap_items_parquet VALUES
+(0, 35000,  'Sedan',     'Model 3', 'Standard range plus'),
+(1, 45000,  'Sedan',     'Model 3', 'Long range'),
+(2, 50000,  'Sedan',     'Model 3', 'Performance'),
+(3, 48000,  'Crossover', 'Model Y', 'Long range'),
+(4, 55000,  'Crossover', 'Model Y', 'Performance'),
+(5, 83000,  'Sports',    'Model S', 'Long range'),
+(6, 123000, 'Sports',   'Model S', 'Plaid')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: INSERT INTO llap_items_parquet VALUES
+(0, 35000,  'Sedan',     'Model 3', 'Standard range plus'),
+(1, 45000,  'Sedan',     'Model 3', 'Long range'),
+(2, 50000,  'Sedan',     'Model 3', 'Performance'),
+(3, 48000,  'Crossover', 'Model Y', 'Long range'),
+(4, 55000,  'Crossover', 'Model Y', 'Performance'),
+(5, 83000,  'Sports',    'Model S', 'Long range'),
+(6, 123000, 'Sports',   'Model S', 'Plaid')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: CREATE EXTERNAL TABLE llap_orders_parquet (orderid INT, quantity INT, itemid INT, tradets TIMESTAMP) PARTITIONED BY (p1 STRING, p2 STRING) STORED BY ICEBERG STORED AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: CREATE EXTERNAL TABLE llap_orders_parquet (orderid INT, quantity INT, itemid INT, tradets TIMESTAMP) PARTITIONED BY (p1 STRING, p2 STRING) STORED BY ICEBERG STORED AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(0, 48, 5, timestamp('2000-06-04 19:55:46.129'), 'EU', 'DE'),
+(1, 12, 6, timestamp('2007-06-24 19:23:22.829'), 'US', 'TX'),
+(2, 76, 4, timestamp('2018-02-19 23:43:51.995'), 'EU', 'DE'),
+(3, 91, 5, timestamp('2000-07-15 09:09:11.587'), 'US', 'NJ'),
+(4, 18, 6, timestamp('2007-12-02 22:30:39.302'), 'EU', 'ES'),
+(5, 71, 5, timestamp('2010-02-08 20:31:23.430'), 'EU', 'DE'),
+(6, 78, 3, timestamp('2016-02-22 20:37:37.025'), 'EU', 'FR'),
+(7, 88, 0, timestamp('2020-03-26 18:47:40.611'), 'EU', 'FR'),
+(8, 87, 4, timestamp('2003-02-20 00:48:09.139'), 'EU', 'ES'),
+(9, 60, 6, timestamp('2012-08-28 01:35:54.283'), 'EU', 'IT'),
+(10, 24, 5, timestamp('2015-03-28 18:57:50.069'), 'US', 'NY'),
+(11, 42, 2, timestamp('2012-06-27 01:13:32.350'), 'EU', 'UK'),
+(12, 37, 4, timestamp('2020-08-09 01:18:50.153'), 'US', 'NY'),
+(13, 52, 1, timestamp('2019-09-04 01:46:19.558'), 'EU', 'UK'),
+(14, 96, 3, timestamp('2019-03-05 22:00:03.020'), 'US', 'NJ'),
+(15, 18, 3, timestamp('2001-09-11 00:14:12.687'), 'EU', 'FR'),
+(16, 46, 0, timestamp('2013-08-31 02:16:17.878'), 'EU', 'UK'),
+(17, 26, 5, timestamp('2001-02-01 20:05:32.317'), 'EU', 'FR'),
+(18, 68, 5, timestamp('2009-12-29 08:44:08.048'), 'EU', 'ES'),
+(19, 54, 6, timestamp('2015-08-15 01:59:22.177'), 'EU', 'HU'),
+(20, 10, 0, timestamp('2018-05-06 12:56:12.789'), 'US', 'CA')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(0, 48, 5, timestamp('2000-06-04 19:55:46.129'), 'EU', 'DE'),
+(1, 12, 6, timestamp('2007-06-24 19:23:22.829'), 'US', 'TX'),
+(2, 76, 4, timestamp('2018-02-19 23:43:51.995'), 'EU', 'DE'),
+(3, 91, 5, timestamp('2000-07-15 09:09:11.587'), 'US', 'NJ'),
+(4, 18, 6, timestamp('2007-12-02 22:30:39.302'), 'EU', 'ES'),
+(5, 71, 5, timestamp('2010-02-08 20:31:23.430'), 'EU', 'DE'),
+(6, 78, 3, timestamp('2016-02-22 20:37:37.025'), 'EU', 'FR'),
+(7, 88, 0, timestamp('2020-03-26 18:47:40.611'), 'EU', 'FR'),
+(8, 87, 4, timestamp('2003-02-20 00:48:09.139'), 'EU', 'ES'),
+(9, 60, 6, timestamp('2012-08-28 01:35:54.283'), 'EU', 'IT'),
+(10, 24, 5, timestamp('2015-03-28 18:57:50.069'), 'US', 'NY'),
+(11, 42, 2, timestamp('2012-06-27 01:13:32.350'), 'EU', 'UK'),
+(12, 37, 4, timestamp('2020-08-09 01:18:50.153'), 'US', 'NY'),
+(13, 52, 1, timestamp('2019-09-04 01:46:19.558'), 'EU', 'UK'),
+(14, 96, 3, timestamp('2019-03-05 22:00:03.020'), 'US', 'NJ'),
+(15, 18, 3, timestamp('2001-09-11 00:14:12.687'), 'EU', 'FR'),
+(16, 46, 0, timestamp('2013-08-31 02:16:17.878'), 'EU', 'UK'),
+(17, 26, 5, timestamp('2001-02-01 20:05:32.317'), 'EU', 'FR'),
+(18, 68, 5, timestamp('2009-12-29 08:44:08.048'), 'EU', 'ES'),
+(19, 54, 6, timestamp('2015-08-15 01:59:22.177'), 'EU', 'HU'),
+(20, 10, 0, timestamp('2018-05-06 12:56:12.789'), 'US', 'CA')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE p1 = 'EU' and i.price >= 50000 GROUP BY i.name, i.description
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE p1 = 'EU' and i.price >= 50000 GROUP BY i.name, i.description
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+Model 3	Performance	42
+Model S	Long range	213
+Model S	Plaid	132
+Model Y	Performance	163
+PREHOOK: query: ALTER TABLE llap_items_parquet CHANGE category cat string AFTER description
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: ALTER TABLE llap_items_parquet CHANGE category cat string AFTER description
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: ALTER TABLE llap_items_parquet CHANGE price cost int AFTER name
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: ALTER TABLE llap_items_parquet CHANGE price cost int AFTER name
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE p1 = 'EU' and i.cost >= 100000 GROUP BY i.name, i.description
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE p1 = 'EU' and i.cost >= 100000 GROUP BY i.name, i.description
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+Model S	Plaid	132
+PREHOOK: query: ALTER TABLE llap_items_parquet ADD COLUMNS (to60 float)
+PREHOOK: type: ALTERTABLE_ADDCOLS
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: ALTER TABLE llap_items_parquet ADD COLUMNS (to60 float)
+POSTHOOK: type: ALTERTABLE_ADDCOLS
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: INSERT INTO llap_items_parquet VALUES
+(7, 'Model X', 93000, 'Long range', 'SUV', 3.8),
+(7, 'Model X', 113000, 'Plaid', 'SUV', 2.5)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: INSERT INTO llap_items_parquet VALUES
+(7, 'Model X', 93000, 'Long range', 'SUV', 3.8),
+(7, 'Model X', 113000, 'Plaid', 'SUV', 2.5)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: SELECT cat, min(to60) from llap_items_parquet group by cat
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT cat, min(to60) from llap_items_parquet group by cat
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+#### A masked pattern was here ####
+Crossover	NULL
+SUV	2.5
+Sedan	NULL
+Sports	NULL
+PREHOOK: query: ALTER TABLE llap_items_parquet REPLACE COLUMNS (itemid int, name string, cost int, description string, to60 float)
+PREHOOK: type: ALTERTABLE_REPLACECOLS
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: ALTER TABLE llap_items_parquet REPLACE COLUMNS (itemid int, name string, cost int, description string, to60 float)
+POSTHOOK: type: ALTERTABLE_REPLACECOLS
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: INSERT INTO llap_items_parquet VALUES
+(8, 'Cybertruck', 40000, 'Single Motor RWD', 6.5),
+(9, 'Cybertruck', 50000, 'Dual Motor AWD', 4.5)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: INSERT INTO llap_items_parquet VALUES
+(8, 'Cybertruck', 40000, 'Single Motor RWD', 6.5),
+(9, 'Cybertruck', 50000, 'Dual Motor AWD', 4.5)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: SELECT name, min(to60), max(cost) FROM llap_items_parquet WHERE itemid > 3 GROUP BY name
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT name, min(to60), max(cost) FROM llap_items_parquet WHERE itemid > 3 GROUP BY name
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+#### A masked pattern was here ####
+Cybertruck	4.5	50000
+Model S	NULL	123000
+Model X	2.5	113000
+Model Y	NULL	55000
+PREHOOK: query: ALTER TABLE llap_orders_parquet CHANGE tradets ordertime timestamp AFTER p2
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@llap_orders_parquet
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet CHANGE tradets ordertime timestamp AFTER p2
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p1 region string
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@llap_orders_parquet
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p1 region string
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(21, 21, 8, 'EU', 'HU', timestamp('2000-01-04 19:55:46.129'))
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(21, 21, 8, 'EU', 'HU', timestamp('2000-01-04 19:55:46.129'))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT region, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT region, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+EU	2000-01-04 19:55:46.129	153
+US	2007-06-24 19:23:22.829	12
+PREHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p2 state string
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@llap_orders_parquet
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p2 state string
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT region, state, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region, state
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT region, state, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region, state
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+EU	ES	2007-12-02 22:30:39.302	18
+EU	HU	2000-01-04 19:55:46.129	75
+EU	IT	2012-08-28 01:35:54.283	60
+US	TX	2007-06-24 19:23:22.829	12
+PREHOOK: query: ALTER TABLE llap_orders_parquet ADD COLUMNS (city string)
+PREHOOK: type: ALTERTABLE_ADDCOLS
+PREHOOK: Input: default@llap_orders_parquet
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet ADD COLUMNS (city string)
+POSTHOOK: type: ALTERTABLE_ADDCOLS
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(22, 99, 9, 'EU', 'DE', timestamp('2021-01-04 19:55:46.129'), 'München')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(22, 99, 9, 'EU', 'DE', timestamp('2021-01-04 19:55:46.129'), 'München')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT state, max(city) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT state, max(city) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+DE	München
+ES	NULL
+FR	NULL
+HU	NULL
+IT	NULL
+UK	NULL
+PREHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (region, state, city)
+PREHOOK: type: ALTERTABLE_SETPARTSPEC
+PREHOOK: Input: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (region, state, city)
+POSTHOOK: type: ALTERTABLE_SETPARTSPEC
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(23, 89, 6, 'EU', 'IT', timestamp('2021-02-04 19:55:46.129'), 'Venezia')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(23, 89, 6, 'EU', 'IT', timestamp('2021-02-04 19:55:46.129'), 'Venezia')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+DE	München	5.75
+ES	NULL	5.0
+FR	NULL	2.75
+HU	NULL	7.0
+IT	Venezia	6.0
+UK	NULL	1.0
+PREHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (state, city)
+PREHOOK: type: ALTERTABLE_SETPARTSPEC
+PREHOOK: Input: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (state, city)
+POSTHOOK: type: ALTERTABLE_SETPARTSPEC
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(24, 88, 5, 'EU', 'UK', timestamp('2006-02-04 19:55:46.129'), 'London')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(24, 88, 5, 'EU', 'UK', timestamp('2006-02-04 19:55:46.129'), 'London')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+DE	München	5.75
+ES	NULL	5.0
+FR	NULL	2.75
+HU	NULL	7.0
+IT	Venezia	6.0
+UK	London	2.0
+PREHOOK: query: ALTER TABLE llap_orders_parquet REPLACE COLUMNS (quantity int, itemid int, region string, state string, ordertime timestamp, city string)
+PREHOOK: type: ALTERTABLE_REPLACECOLS
+PREHOOK: Input: default@llap_orders_parquet
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet REPLACE COLUMNS (quantity int, itemid int, region string, state string, ordertime timestamp, city string)
+POSTHOOK: type: ALTERTABLE_REPLACECOLS
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(88, 5, 'EU', 'FR', timestamp('2006-02-04 19:55:46.129'), 'Paris')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(88, 5, 'EU', 'FR', timestamp('2006-02-04 19:55:46.129'), 'Paris')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+DE	München	5.75
+ES	NULL	5.0
+FR	Paris	3.2
+HU	NULL	7.0
+IT	Venezia	6.0
+UK	London	2.0
+PREHOOK: query: SELECT o.city, i.name, min(i.cost), max(to60), sum(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE region = 'EU' and i.cost >= 50000 and ordertime > timestamp('2010-01-01') GROUP BY o.city, i.name
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT o.city, i.name, min(i.cost), max(to60), sum(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE region = 'EU' and i.cost >= 50000 and ordertime > timestamp('2010-01-01') GROUP BY o.city, i.name
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+München	Cybertruck	50000	4.5	99
+NULL	Model 3	50000	NULL	42
+Venezia	Model S	123000	NULL	89
+NULL	Model S	83000	NULL	185
+NULL	Model Y	55000	NULL	76
+PREHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid  WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+Cybertruck	Dual Motor AWD	99
+Model 3	Performance	42
+Model S	Long range	389
+Model S	Plaid	221
+Model Y	Performance	163
+PREHOOK: query: CREATE EXTERNAL TABLE mig_source_parquet (id int) partitioned by (region string) stored AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@mig_source_parquet
+POSTHOOK: query: CREATE EXTERNAL TABLE mig_source_parquet (id int) partitioned by (region string) stored AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@mig_source_parquet
+PREHOOK: query: INSERT INTO mig_source_parquet VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@mig_source_parquet
+POSTHOOK: query: INSERT INTO mig_source_parquet VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@mig_source_parquet
+POSTHOOK: Output: default@mig_source_parquet@region=EU
+POSTHOOK: Output: default@mig_source_parquet@region=US
+POSTHOOK: Lineage: mig_source_parquet PARTITION(region=EU).id SCRIPT []
+POSTHOOK: Lineage: mig_source_parquet PARTITION(region=US).id SCRIPT []
+PREHOOK: query: ALTER TABLE mig_source_parquet SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler')
+PREHOOK: type: ALTERTABLE_PROPERTIES
+PREHOOK: Input: default@mig_source_parquet
+PREHOOK: Output: default@mig_source_parquet
+POSTHOOK: query: ALTER TABLE mig_source_parquet SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler')
+POSTHOOK: type: ALTERTABLE_PROPERTIES
+POSTHOOK: Input: default@mig_source_parquet
+POSTHOOK: Output: default@mig_source_parquet
+PREHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region
+PREHOOK: type: QUERY
+PREHOOK: Input: default@mig_source_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@mig_source_parquet
+#### A masked pattern was here ####
+EU	6
+US	3
+PREHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region
+PREHOOK: type: QUERY
+PREHOOK: Input: default@mig_source_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@mig_source_parquet
+#### A masked pattern was here ####
+EU	6
+US	3
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index b32ad20d577..0b589fe4d36 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -404,9 +404,11 @@ erasurecoding.only.query.files=\
 
 iceberg.llap.query.files=\
   llap_iceberg_read_orc.q,\
+  llap_iceberg_read_parquet.q,\
   vectorized_iceberg_read_mixed.q,\
   vectorized_iceberg_read_orc.q,\
   vectorized_iceberg_read_parquet.q
 
 iceberg.llap.only.query.files=\
-  llap_iceberg_read_orc.q
+  llap_iceberg_read_orc.q,\
+  llap_iceberg_read_parquet.q
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
index a4fc13a0ee0..c650288a93a 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.serde2.Deserializer;
@@ -62,6 +64,21 @@ public interface LlapIo<T> {
    */
   OrcTail getOrcTailFromCache(Path path, Configuration conf, CacheTag tag, @Nullable Object fileKey) throws IOException;
 
+
+  /**
+   * Returns the metadata buffers associated with the Parquet file on the given path.
+   * Content is either obtained from cache, or from disk if there is a cache miss.
+   * @param path Parquet file path
+   * @param conf jobConf
+   * @param fileKey fileId of the Parquet file (either the Long fileId of HDFS or the SyntheticFileId).
+   *                Optional, if it is not provided, it will be generated, see:
+   *                org.apache.hadoop.hive.ql.io.HdfsUtils#getFileId()
+   * @return
+   * @throws IOException
+   */
+  MemoryBufferOrBuffers getParquetFooterBuffersFromCache(Path path, JobConf conf, @Nullable Object fileKey)
+      throws IOException;
+
   /**
    * Handles request to evict entities specified in the request object.
    * @param protoRequest lists Hive entities (DB, table, etc..) whose LLAP buffers should be evicted.
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index ecf8d2575c9..4634c4639f0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -27,10 +27,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
+import javax.annotation.Nullable;
 import javax.management.ObjectName;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
 import org.apache.hadoop.hive.llap.ProactiveEviction;
 import org.apache.hadoop.hive.llap.cache.LlapCacheHydration;
 import org.apache.hadoop.hive.llap.cache.MemoryLimitedPathCache;
@@ -82,6 +86,8 @@ import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
 import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.vector.ParquetFooterInputFromCache;
+import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.io.NullWritable;
@@ -93,12 +99,19 @@ import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hive.common.util.FixedSizedObjectPool;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.orc.impl.OrcTail;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
 
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
+
 public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
   public static final Logger LOG = LoggerFactory.getLogger("LlapIoImpl");
   public static final Logger ORC_LOGGER = LoggerFactory.getLogger("LlapIoOrc");
@@ -455,6 +468,50 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
     }
   }
 
+  @Override
+  public MemoryBufferOrBuffers getParquetFooterBuffersFromCache(Path path, JobConf conf, @Nullable Object fileKey)
+      throws IOException {
+
+    Preconditions.checkNotNull(fileMetadataCache, "Metadata cache must not be null");
+
+    boolean isReadCacheOnly = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_CACHE_ONLY);
+    CacheTag tag = VectorizedParquetRecordReader.cacheTagOfParquetFile(path, daemonConf, conf);
+
+    MemoryBufferOrBuffers footerData = (fileKey == null ) ? null
+        : fileMetadataCache.getFileMetadata(fileKey);
+    if (footerData != null) {
+      LOG.info("Found the footer in cache for " + fileKey);
+      try {
+        return footerData;
+      } finally {
+        fileMetadataCache.decRefBuffer(footerData);
+      }
+    } else {
+      throwIfCacheOnlyRead(isReadCacheOnly);
+    }
+
+    final FileSystem fs = path.getFileSystem(conf);
+    final FileStatus stat = fs.getFileStatus(path);
+
+    // To avoid reading the footer twice, we will cache it first and then read from cache.
+    // Parquet calls protobuf methods directly on the stream and we can't get bytes after the fact.
+    try (SeekableInputStream stream = HadoopStreams.wrap(fs.open(path))) {
+      long footerLengthIndex = stat.getLen()
+          - ParquetFooterInputFromCache.FOOTER_LENGTH_SIZE - ParquetFileWriter.MAGIC.length;
+      stream.seek(footerLengthIndex);
+      int footerLength = BytesUtils.readIntLittleEndian(stream);
+      stream.seek(footerLengthIndex - footerLength);
+      LOG.info("Caching the footer of length " + footerLength + " for " + fileKey);
+      // Note: we don't pass in isStopped here - this is not on an IO thread.
+      footerData = fileMetadataCache.putFileMetadata(fileKey, footerLength, stream, tag, null);
+      try {
+        return footerData;
+      } finally {
+        fileMetadataCache.decRefBuffer(footerData);
+      }
+    }
+  }
+
   @Override
   public LlapDaemonProtocolProtos.CacheEntryList fetchCachedContentInfo() {
     if (useLowLevelCache) {
@@ -479,4 +536,5 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
       LOG.warn("Cannot load data into the cache. Low level cache is disabled.");
     }
   }
+
 }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java
index e4e38392b75..1fe56c08dd6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.io.DataCache;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.hadoop.hive.common.io.FileMetadataCache;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
 import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
@@ -42,7 +43,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.function.Supplier;
 
-import static org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.determineFileId;
 import static org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.getFsSupplier;
 
 /**
@@ -77,7 +77,7 @@ public class LlapOrcCacheLoader implements AutoCloseable {
 
   public void init() throws IOException {
     fsSupplier = getFsSupplier(path, daemonConf);
-    Object fileKey = determineFileId(fsSupplier, path, daemonConf);
+    Object fileKey = LlapHiveUtils.createFileIdUsingFS(fsSupplier.get(), path, daemonConf);
     if(!fileKey.equals(this.fileKey)) {
       throw new IOException("File key mismatch.");
     }
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index aa03bde34e5..7d840599c9a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -499,8 +499,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     return true;
   }
 
-  private static Object determineFileId(Supplier<FileSystem> fsSupplier,
-    FileSplit split, Configuration daemonConf) throws IOException {
+  private static Object determineFileId(Supplier<FileSystem> fsSupplier, FileSplit split, Configuration daemonConf)
+      throws IOException {
 
     if (split instanceof OrcSplit) {
       Object fileKey = ((OrcSplit)split).getFileKey();
@@ -509,17 +509,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       }
     }
     LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
-    return determineFileId(fsSupplier, split.getPath(), daemonConf);
-  }
-
-  static Object determineFileId(Supplier<FileSystem> fsSupplier, Path path, Configuration daemonConf)
-      throws IOException {
-
-    boolean allowSynthetic = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID);
-    boolean checkDefaultFs = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID);
-    boolean forceSynthetic = !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH);
-
-    return HdfsUtils.getFileId(fsSupplier.get(), path, allowSynthetic, checkDefaultFs, forceSynthetic);
+    return LlapHiveUtils.createFileIdUsingFS(fsSupplier.get(), split.getPath(), daemonConf);
   }
 
   /**
@@ -590,7 +580,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       Configuration daemonConf, MetadataCache metadataCache, Object fileKey) throws IOException {
     Supplier<FileSystem> fsSupplier = getFsSupplier(path, jobConf);
     if (fileKey == null) {
-      fileKey = determineFileId(fsSupplier, path, daemonConf);
+      fileKey = LlapHiveUtils.createFileIdUsingFS(fsSupplier.get(), path, daemonConf);
     }
 
     if(fileKey == null || metadataCache == null) {
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index f9058ef7956..40b745e09ff 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.hive.llap.io.encoded.VectorDeserializeOrcWriter.AsyncCa
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
@@ -224,10 +223,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
 
     fs = split.getPath().getFileSystem(daemonConf);
     PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
-    fileKey = determineCacheKey(fs, split, partitionDesc,
-        HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
-        HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
-        !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH));
+    fileKey = determineCacheKey(fs, split, partitionDesc, daemonConf);
     cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
         ? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null;
     this.sourceInputFormat = sourceInputFormat;
@@ -1733,12 +1729,13 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
   }
 
   private static Object determineCacheKey(FileSystem fs, FileSplit split, PartitionDesc partitionDesc,
-      boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException {
+      Configuration daemonConf)
+      throws IOException {
     /* TODO: support this optionally? this is not OrcSplit, but we could add a custom split.
       Object fileKey = ((OrcSplit)split).getFileKey();
       if (fileKey != null) return fileKey; */
     LlapIoImpl.LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
-    Object fileId = HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic);
+    Object fileId = LlapHiveUtils.createFileIdUsingFS(fs, split.getPath(), daemonConf);
     return SchemaAwareCacheKey.buildCacheKey(fileId, LlapHiveUtils.getSchemaHash(partitionDesc));
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
index 52126d971c1..ba62b8d89c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
@@ -18,16 +18,18 @@
 package org.apache.hadoop.hive.llap;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.io.CacheTag;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -145,4 +147,30 @@ public final class LlapHiveUtils {
     return "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
   }
 
+  /**
+   * Determines the fileID for the given path using the FileSystem type provided while considering daemon configuration.
+   * Invokes HdfsUtils.getFileId(), the resulting file ID can be of types Long (inode) or SyntheticFileId depending
+   * on the FS type and the actual daemon configuration.
+   * Can be costly on cloud file systems.
+   * @param fs FileSystem type
+   * @param path Path associated to this file
+   * @param daemonConf Llap daemon configuration
+   * @return the generated fileID, can be null in special cases (e.g. conf disallows synthetic ID on a non-HDFS FS)
+   * @throws IOException
+   */
+  public static Object createFileIdUsingFS(FileSystem fs, Path path, Configuration daemonConf) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Will invoke HdfsUtils.getFileId - this is costly on cloud file systems. " +
+          "Turn on TRACE level logging to show call trace.");
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(Arrays.deepToString(Thread.currentThread().getStackTrace()));
+      }
+    }
+    boolean allowSynthetic = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID);
+    boolean checkDefaultFs = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID);
+    boolean forceSynthetic = !HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_USE_FILEID_PATH);
+
+    return HdfsUtils.getFileId(fs, path, allowSynthetic, checkDefaultFs, forceSynthetic);
+  }
+
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
index dc79e1076b0..56df1547910 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
@@ -21,12 +21,19 @@ package org.apache.hadoop.hive.ql.io;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
 
 public final class SyntheticFileId implements Writable {
+
+  private static final String JOBCONF_KEY = "SYNTHETIC_FILE_ID";
+  private static final Pattern STRING_PATTERN = Pattern.compile("\\[(-?\\d+),\\s(-?\\d+),\\s(\\d+)\\]");
+
   private long pathHash;
   private long modTime;
   private long length;
@@ -41,6 +48,16 @@ public final class SyntheticFileId implements Writable {
     this.length = len;
   }
 
+  private SyntheticFileId(String fileIdAsString) {
+    Matcher matcher = STRING_PATTERN.matcher(fileIdAsString);
+    if (!matcher.matches()) {
+      throw new IllegalArgumentException("Expected format " + STRING_PATTERN + " but got " + fileIdAsString);
+    }
+    this.pathHash = Long.parseLong(matcher.group(1));
+    this.modTime = Long.parseLong(matcher.group(2));
+    this.length = Long.parseLong(matcher.group(3));
+  }
+
   public SyntheticFileId(FileStatus file) {
     this(file.getPath(), file.getLen(), file.getModificationTime());
   }
@@ -109,4 +126,16 @@ public final class SyntheticFileId implements Writable {
   public long getLength() {
     return length;
   }
+
+  public void toJobConf(JobConf job) {
+    job.set(JOBCONF_KEY, this.toString());
+  }
+
+  public static SyntheticFileId fromJobConf(JobConf job) {
+    String idAsString = job.get(JOBCONF_KEY);
+    if (idAsString == null) {
+      return null;
+    }
+    return new SyntheticFileId(idAsString);
+  }
 }
\ No newline at end of file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
index e2e60670cae..49960caf7aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
@@ -30,9 +30,9 @@ import org.apache.parquet.io.SeekableInputStream;
  * read the footer from cache without being aware of the latter.
  * This implements both InputFile and the InputStream that the reader gets from InputFile.
  */
-final class ParquetFooterInputFromCache
+public final class ParquetFooterInputFromCache
     extends SeekableInputStream implements InputFile {
-  final static int FOOTER_LENGTH_SIZE = 4; // For the file size check.
+  public final static int FOOTER_LENGTH_SIZE = 4; // For the file size check.
   private static final int TAIL_LENGTH = ParquetFileWriter.MAGIC.length + FOOTER_LENGTH_SIZE;
   private static final int FAKE_PREFIX_LENGTH = ParquetFileWriter.MAGIC.length;
   private final int length, footerLength;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index e0e14863dfd..f7b13cb3d6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -30,12 +30,14 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapCacheAwareFs;
 import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.BucketIdentifier;
 import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
 import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -52,12 +54,10 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.parquet.ParquetRuntimeException;
-import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
 import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.ParquetInputSplit;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -83,7 +83,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeMap;
 
-import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 
 /**
@@ -104,7 +103,6 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   private Object[] partitionValues;
   private Path cacheFsPath;
   private static final int MAP_DEFINITION_LEVEL_MAX = 3;
-  private final boolean isReadCacheOnly;
 
   /**
    * For each request column, the reader to read this column. This is NULL if this column
@@ -150,11 +148,11 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       this.cacheConf = cacheConf;
 
       if (metadataCache != null) {
-        cacheKey = HdfsUtils.getFileId(filePath.getFileSystem(conf), filePath,
-            HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
-            HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
-            !HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH));
-        // HdfsUtils.getFileId might yield to null in certain configurations
+        cacheKey = SyntheticFileId.fromJobConf(conf);
+        if (cacheKey == null) {
+          cacheKey = LlapHiveUtils.createFileIdUsingFS(filePath.getFileSystem(conf), filePath, cacheConf);
+        }
+        // createFileIdUsingFS() might yield to null in certain configurations
         if (cacheKey != null) {
           cacheTag = cacheTagOfParquetFile(filePath, cacheConf, conf);
           // If we are going to use cache, change the path to depend on file ID for extra consistency.
@@ -169,7 +167,6 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
 
       colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
       //initialize the rowbatchContext
-      isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY);
       rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf);
 
       if (parquetInputSplit != null) {
@@ -285,40 +282,14 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
 
   private ParquetMetadata readSplitFooter(JobConf configuration, final Path file,
       Object cacheKey, MetadataFilter filter, CacheTag tag) throws IOException {
-    MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null
-        : metadataCache.getFileMetadata(cacheKey);
-    if (footerData != null) {
-      LOG.info("Found the footer in cache for " + cacheKey);
-      try {
-        return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
-      } finally {
-        metadataCache.decRefBuffer(footerData);
-      }
-    } else {
-      throwIfCacheOnlyRead(isReadCacheOnly);
-    }
-    final FileSystem fs = file.getFileSystem(configuration);
-    final FileStatus stat = fs.getFileStatus(file);
     if (cacheKey == null || metadataCache == null) {
+      // Non-LLAP case
+      FileSystem fs = file.getFileSystem(configuration);
+      FileStatus stat = fs.getFileStatus(file);
       return readFooterFromFile(file, fs, stat, filter);
-    }
-
-    // To avoid reading the footer twice, we will cache it first and then read from cache.
-    // Parquet calls protobuf methods directly on the stream and we can't get bytes after the fact.
-    try (SeekableInputStream stream = HadoopStreams.wrap(fs.open(file))) {
-      long footerLengthIndex = stat.getLen()
-          - ParquetFooterInputFromCache.FOOTER_LENGTH_SIZE - ParquetFileWriter.MAGIC.length;
-      stream.seek(footerLengthIndex);
-      int footerLength = BytesUtils.readIntLittleEndian(stream);
-      stream.seek(footerLengthIndex - footerLength);
-      LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey);
-      // Note: we don't pass in isStopped here - this is not on an IO thread.
-      footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag, null);
-      try {
-        return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
-      } finally {
-        metadataCache.decRefBuffer(footerData);
-      }
+    } else {
+      MemoryBufferOrBuffers footerData = LlapProxy.getIo().getParquetFooterBuffersFromCache(file, configuration, cacheKey);
+      return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
     }
   }
 
@@ -337,7 +308,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     return ParquetFileReader.readFooter(inputFile, filter);
   }
 
-  private static CacheTag cacheTagOfParquetFile(Path path, Configuration cacheConf, JobConf jobConf) {
+  public static CacheTag cacheTagOfParquetFile(Path path, Configuration cacheConf, JobConf jobConf) {
     MapWork mapWork = LlapHiveUtils.findMapWork(jobConf);
     if (!HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE) || mapWork == null) {
       return null;