You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2022/08/08 12:42:21 UTC
[hive] branch master updated: HIVE-26115: LLAP cache utilization for Iceberg Parquet files (#3480) (Adam Szita, reviewed by Laszlo Pinter)
This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new fa111f15008 HIVE-26115: LLAP cache utilization for Iceberg Parquet files (#3480) (Adam Szita, reviewed by Laszlo Pinter)
fa111f15008 is described below
commit fa111f150086f0069e3a6a5b8e6f34a4afae3b1b
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Mon Aug 8 14:42:16 2022 +0200
HIVE-26115: LLAP cache utilization for Iceberg Parquet files (#3480) (Adam Szita, reviewed by Laszlo Pinter)
---
.../mr/hive/vector/HiveVectorizedReader.java | 22 +-
.../parquet}/ParquetFooterInputFromCache.java | 122 +++---
.../queries/positive/llap_iceberg_read_parquet.q | 122 ++++++
.../positive/llap/llap_iceberg_read_parquet.q.out | 465 +++++++++++++++++++++
.../test/resources/testconfiguration.properties | 4 +-
.../org/apache/hadoop/hive/llap/io/api/LlapIo.java | 17 +
.../hadoop/hive/llap/io/api/impl/LlapIoImpl.java | 58 +++
.../hive/llap/io/encoded/LlapOrcCacheLoader.java | 4 +-
.../hive/llap/io/encoded/OrcEncodedDataReader.java | 18 +-
.../llap/io/encoded/SerDeEncodedDataReader.java | 11 +-
.../org/apache/hadoop/hive/llap/LlapHiveUtils.java | 30 +-
.../apache/hadoop/hive/ql/io/SyntheticFileId.java | 29 ++
.../vector/ParquetFooterInputFromCache.java | 4 +-
.../vector/VectorizedParquetRecordReader.java | 57 +--
14 files changed, 839 insertions(+), 124 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 00b9b3c73f0..6e23dff92c6 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -42,7 +43,9 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hive.iceberg.org.apache.orc.OrcConf;
+import org.apache.hive.iceberg.org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.hive.iceberg.org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.hive.iceberg.org.apache.parquet.schema.MessageType;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
@@ -53,6 +56,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
import org.apache.iceberg.orc.VectorizedReadUtils;
+import org.apache.iceberg.parquet.ParquetFooterInputFromCache;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -125,6 +129,7 @@ public class HiveVectorizedReader {
// TODO: Iceberg currently does not track the last modification time of a file. Until that's added,
// we need to set Long.MIN_VALUE as last modification time in the fileId triplet.
SyntheticFileId fileId = new SyntheticFileId(path, task.file().fileSizeInBytes(), Long.MIN_VALUE);
+ fileId.toJobConf(job);
RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
switch (format) {
@@ -133,7 +138,7 @@ public class HiveVectorizedReader {
break;
case PARQUET:
- recordReader = parquetRecordReader(job, reporter, task, path, start, length);
+ recordReader = parquetRecordReader(job, reporter, task, path, start, length, fileId);
break;
default:
throw new UnsupportedOperationException("Vectorized Hive reading unimplemented for format: " + format);
@@ -182,11 +187,22 @@ public class HiveVectorizedReader {
}
private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReader(JobConf job, Reporter reporter,
- FileScanTask task, Path path, long start, long length) throws IOException {
+ FileScanTask task, Path path, long start, long length, SyntheticFileId fileId) throws IOException {
InputSplit split = new FileSplit(path, start, length, job);
VectorizedParquetInputFormat inputFormat = new VectorizedParquetInputFormat();
- MessageType fileSchema = ParquetFileReader.readFooter(job, path).getFileMetaData().getSchema();
+ MemoryBufferOrBuffers footerData = null;
+ if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
+ LlapProxy.getIo() != null) {
+ LlapProxy.getIo().initCacheOnlyInputFormat(inputFormat);
+ footerData = LlapProxy.getIo().getParquetFooterBuffersFromCache(path, job, fileId);
+ }
+
+ ParquetMetadata parquetMetadata = footerData != null ?
+ ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), ParquetMetadataConverter.NO_FILTER) :
+ ParquetFileReader.readFooter(job, path);
+
+ MessageType fileSchema = parquetMetadata.getFileMetaData().getSchema();
MessageType typeWithIds = null;
Schema expectedSchema = task.spec().schema();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
similarity index 59%
copy from ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
copy to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
index e2e60670cae..bd4bd74106f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
@@ -1,42 +1,52 @@
/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hadoop.hive.ql.io.parquet.vector;
+package org.apache.iceberg.parquet;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
-
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.io.InputFile;
-import org.apache.parquet.io.SeekableInputStream;
+import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.hive.iceberg.org.apache.parquet.io.InputFile;
+import org.apache.hive.iceberg.org.apache.parquet.io.SeekableInputStream;
/**
+ * Copy of ParquetFooterInputFromCache from hive-exec module to switch dependent Parquet packages
+ * to the shaded version (org.apache.hive.iceberg.org.apache.parquet.io...)
+ *
* The Parquet InputFile implementation that allows the reader to
* read the footer from cache without being aware of the latter.
* This implements both InputFile and the InputStream that the reader gets from InputFile.
*/
-final class ParquetFooterInputFromCache
+public final class ParquetFooterInputFromCache
extends SeekableInputStream implements InputFile {
- final static int FOOTER_LENGTH_SIZE = 4; // For the file size check.
+ public static final int FOOTER_LENGTH_SIZE = 4; // For the file size check.
private static final int TAIL_LENGTH = ParquetFileWriter.MAGIC.length + FOOTER_LENGTH_SIZE;
private static final int FAKE_PREFIX_LENGTH = ParquetFileWriter.MAGIC.length;
- private final int length, footerLength;
- private int position = 0, bufferIx = 0, bufferPos = 0;
+ private final int length;
+ private final int footerLength;
+ private int position = 0;
+ private int bufferIx = 0;
+ private int bufferPos = 0;
private final MemoryBuffer[] cacheData;
private final int[] positions;
@@ -50,18 +60,18 @@ final class ParquetFooterInputFromCache
cacheData = new MemoryBuffer[bufs.length + 1];
System.arraycopy(bufs, 0, cacheData, 0, bufs.length);
}
- int footerLength = 0;
+ int footerLen = 0;
positions = new int[cacheData.length];
for (int i = 0; i < cacheData.length - 1; ++i) {
- positions[i] = footerLength;
+ positions[i] = footerLen;
int dataLen = cacheData[i].getByteBufferRaw().remaining();
assert dataLen > 0;
- footerLength += dataLen;
+ footerLen += dataLen;
}
- positions[cacheData.length - 1] = footerLength;
- cacheData[cacheData.length - 1] = new FooterEndBuffer(footerLength);
- this.footerLength = footerLength;
- this.length = footerLength + FAKE_PREFIX_LENGTH + TAIL_LENGTH;
+ positions[cacheData.length - 1] = footerLen;
+ cacheData[cacheData.length - 1] = new FooterEndBuffer(footerLen);
+ this.footerLength = footerLen;
+ this.length = footerLen + FAKE_PREFIX_LENGTH + TAIL_LENGTH;
}
@Override
@@ -71,7 +81,7 @@ final class ParquetFooterInputFromCache
@Override
public SeekableInputStream newStream() throws IOException {
- // Note: this doesn't maintain proper newStream semantics (if any).
+ // Note: this doesn't maintain proper newStream semantics (if any).
// We could either clone this instead or enforce that this is only called once.
return this;
}
@@ -82,9 +92,9 @@ final class ParquetFooterInputFromCache
}
@Override
- public void seek(long targetPos) throws IOException {
- this.position = (int)targetPos;
- targetPos -= FAKE_PREFIX_LENGTH;
+ public void seek(long pos) throws IOException {
+ this.position = (int) pos;
+ long targetPos = pos - FAKE_PREFIX_LENGTH;
// Not efficient, but we don't expect this to be called frequently.
for (int i = 1; i <= positions.length; ++i) {
int endPos = (i == positions.length) ? (length - FAKE_PREFIX_LENGTH) : positions[i];
@@ -94,25 +104,32 @@ final class ParquetFooterInputFromCache
return;
}
}
- throw new IOException("Incorrect seek " + targetPos + "; footer length " + footerLength
- + Arrays.toString(positions));
+ throw new IOException("Incorrect seek " + targetPos + "; footer length " + footerLength +
+ Arrays.toString(positions));
}
@Override
public void readFully(byte[] b, int offset, int len) throws IOException {
- if (readInternal(b, offset, len) == len) return;
+ if (readInternal(b, offset, len) == len) {
+ return;
+ }
throw new EOFException();
}
- public int readInternal(byte[] b, int offset, int len) {
- if (position >= length) return -1;
- int argPos = offset, argEnd = offset + len;
+ public int readInternal(byte[] bytes, int offset, int len) {
+ if (position >= length) {
+ return -1;
+ }
+ int argPos = offset;
+ int argEnd = offset + len;
while (argPos < argEnd) {
- if (bufferIx == cacheData.length) return (argPos - offset);
+ if (bufferIx == cacheData.length) {
+ return argPos - offset;
+ }
ByteBuffer data = cacheData[bufferIx].getByteBufferDup();
int toConsume = Math.min(argEnd - argPos, data.remaining() - bufferPos);
data.position(data.position() + bufferPos);
- data.get(b, argPos, toConsume);
+ data.get(bytes, argPos, toConsume);
if (data.remaining() == 0) {
++bufferIx;
bufferPos = 0;
@@ -126,7 +143,9 @@ final class ParquetFooterInputFromCache
@Override
public int read() throws IOException {
- if (position >= length) return -1;
+ if (position >= length) {
+ return -1;
+ }
++position;
ByteBuffer data = cacheData[bufferIx].getByteBufferRaw();
int bp = bufferPos;
@@ -148,9 +167,9 @@ final class ParquetFooterInputFromCache
bb.position(bb.position() + result);
}
} else {
- byte[] b = new byte[bb.remaining()];
- result = readInternal(b, 0, bb.remaining());
- bb.put(b, 0, result);
+ byte[] bytes = new byte[bb.remaining()];
+ result = readInternal(bytes, 0, bb.remaining());
+ bb.put(bytes, 0, result);
}
return result;
}
@@ -169,18 +188,19 @@ final class ParquetFooterInputFromCache
* The fake buffer that emulates end of file, with footer length and magic. Given that these
* can be generated based on the footer buffer itself, we don't cache them.
*/
- private final static class FooterEndBuffer implements MemoryBuffer {
+ private static final class FooterEndBuffer implements MemoryBuffer {
private final ByteBuffer bb;
- public FooterEndBuffer(int footerLength) {
- byte[] b = new byte[8];
- b[0] = (byte) ((footerLength >>> 0) & 0xFF);
- b[1] = (byte) ((footerLength >>> 8) & 0xFF);
- b[2] = (byte) ((footerLength >>> 16) & 0xFF);
- b[3] = (byte) ((footerLength >>> 24) & 0xFF);
+
+ FooterEndBuffer(int footerLength) {
+ byte[] bytes = new byte[8];
+ bytes[0] = (byte) ((footerLength >>> 0) & 0xFF);
+ bytes[1] = (byte) ((footerLength >>> 8) & 0xFF);
+ bytes[2] = (byte) ((footerLength >>> 16) & 0xFF);
+ bytes[3] = (byte) ((footerLength >>> 24) & 0xFF);
for (int i = 0; i < ParquetFileWriter.MAGIC.length; ++i) {
- b[4 + i] = ParquetFileWriter.MAGIC[i];
+ bytes[4 + i] = ParquetFileWriter.MAGIC[i];
}
- bb = ByteBuffer.wrap(b);
+ bb = ByteBuffer.wrap(bytes);
}
@Override
@@ -193,4 +213,4 @@ final class ParquetFooterInputFromCache
return bb.duplicate();
}
}
-}
\ No newline at end of file
+}
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_parquet.q b/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_parquet.q
new file mode 100644
index 00000000000..d961dc74d6b
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_parquet.q
@@ -0,0 +1,122 @@
+--test against vectorized LLAP execution mode
+set hive.llap.io.enabled=true;
+set hive.vectorized.execution.enabled=true;
+
+DROP TABLE IF EXISTS llap_orders_parquet PURGE;
+DROP TABLE IF EXISTS llap_items_parquet PURGE;
+DROP TABLE IF EXISTS mig_source_parquet PURGE;
+
+
+CREATE EXTERNAL TABLE llap_items_parquet (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS PARQUET;
+INSERT INTO llap_items_parquet VALUES
+(0, 35000, 'Sedan', 'Model 3', 'Standard range plus'),
+(1, 45000, 'Sedan', 'Model 3', 'Long range'),
+(2, 50000, 'Sedan', 'Model 3', 'Performance'),
+(3, 48000, 'Crossover', 'Model Y', 'Long range'),
+(4, 55000, 'Crossover', 'Model Y', 'Performance'),
+(5, 83000, 'Sports', 'Model S', 'Long range'),
+(6, 123000, 'Sports', 'Model S', 'Plaid');
+
+CREATE EXTERNAL TABLE llap_orders_parquet (orderid INT, quantity INT, itemid INT, tradets TIMESTAMP) PARTITIONED BY (p1 STRING, p2 STRING) STORED BY ICEBERG STORED AS PARQUET;
+INSERT INTO llap_orders_parquet VALUES
+(0, 48, 5, timestamp('2000-06-04 19:55:46.129'), 'EU', 'DE'),
+(1, 12, 6, timestamp('2007-06-24 19:23:22.829'), 'US', 'TX'),
+(2, 76, 4, timestamp('2018-02-19 23:43:51.995'), 'EU', 'DE'),
+(3, 91, 5, timestamp('2000-07-15 09:09:11.587'), 'US', 'NJ'),
+(4, 18, 6, timestamp('2007-12-02 22:30:39.302'), 'EU', 'ES'),
+(5, 71, 5, timestamp('2010-02-08 20:31:23.430'), 'EU', 'DE'),
+(6, 78, 3, timestamp('2016-02-22 20:37:37.025'), 'EU', 'FR'),
+(7, 88, 0, timestamp('2020-03-26 18:47:40.611'), 'EU', 'FR'),
+(8, 87, 4, timestamp('2003-02-20 00:48:09.139'), 'EU', 'ES'),
+(9, 60, 6, timestamp('2012-08-28 01:35:54.283'), 'EU', 'IT'),
+(10, 24, 5, timestamp('2015-03-28 18:57:50.069'), 'US', 'NY'),
+(11, 42, 2, timestamp('2012-06-27 01:13:32.350'), 'EU', 'UK'),
+(12, 37, 4, timestamp('2020-08-09 01:18:50.153'), 'US', 'NY'),
+(13, 52, 1, timestamp('2019-09-04 01:46:19.558'), 'EU', 'UK'),
+(14, 96, 3, timestamp('2019-03-05 22:00:03.020'), 'US', 'NJ'),
+(15, 18, 3, timestamp('2001-09-11 00:14:12.687'), 'EU', 'FR'),
+(16, 46, 0, timestamp('2013-08-31 02:16:17.878'), 'EU', 'UK'),
+(17, 26, 5, timestamp('2001-02-01 20:05:32.317'), 'EU', 'FR'),
+(18, 68, 5, timestamp('2009-12-29 08:44:08.048'), 'EU', 'ES'),
+(19, 54, 6, timestamp('2015-08-15 01:59:22.177'), 'EU', 'HU'),
+(20, 10, 0, timestamp('2018-05-06 12:56:12.789'), 'US', 'CA');
+
+--select query without any schema change yet
+SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.price >= 50000 GROUP BY i.name, i.description;
+
+
+--schema evolution on unpartitioned table
+--renames and reorders
+ALTER TABLE llap_items_parquet CHANGE category cat string AFTER description;
+ALTER TABLE llap_items_parquet CHANGE price cost int AFTER name;
+SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.cost >= 100000 GROUP BY i.name, i.description;
+
+--adding a column
+ALTER TABLE llap_items_parquet ADD COLUMNS (to60 float);
+INSERT INTO llap_items_parquet VALUES
+(7, 'Model X', 93000, 'Long range', 'SUV', 3.8),
+(7, 'Model X', 113000, 'Plaid', 'SUV', 2.5);
+SELECT cat, min(to60) from llap_items_parquet group by cat;
+
+--removing a column
+ALTER TABLE llap_items_parquet REPLACE COLUMNS (itemid int, name string, cost int, description string, to60 float);
+INSERT INTO llap_items_parquet VALUES
+(8, 'Cybertruck', 40000, 'Single Motor RWD', 6.5),
+(9, 'Cybertruck', 50000, 'Dual Motor AWD', 4.5);
+SELECT name, min(to60), max(cost) FROM llap_items_parquet WHERE itemid > 3 GROUP BY name;
+
+
+--schema evolution on partitioned table (including partition changes)
+--renames and reorders
+ALTER TABLE llap_orders_parquet CHANGE tradets ordertime timestamp AFTER p2;
+ALTER TABLE llap_orders_parquet CHANGE p1 region string;
+INSERT INTO llap_orders_parquet VALUES
+(21, 21, 8, 'EU', 'HU', timestamp('2000-01-04 19:55:46.129'));
+SELECT region, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region;
+
+ALTER TABLE llap_orders_parquet CHANGE p2 state string;
+SELECT region, state, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region, state;
+
+--adding new column
+ALTER TABLE llap_orders_parquet ADD COLUMNS (city string);
+INSERT INTO llap_orders_parquet VALUES
+(22, 99, 9, 'EU', 'DE', timestamp('2021-01-04 19:55:46.129'), 'München');
+SELECT state, max(city) from llap_orders_parquet WHERE region = 'EU' GROUP BY state;
+
+--making it a partition column
+ALTER TABLE llap_orders_parquet SET PARTITION SPEC (region, state, city);
+INSERT INTO llap_orders_parquet VALUES
+(23, 89, 6, 'EU', 'IT', timestamp('2021-02-04 19:55:46.129'), 'Venezia');
+SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state;
+
+--de-partitioning a column
+ALTER TABLE llap_orders_parquet SET PARTITION SPEC (state, city);
+INSERT INTO llap_orders_parquet VALUES
+(24, 88, 5, 'EU', 'UK', timestamp('2006-02-04 19:55:46.129'), 'London');
+SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state;
+
+--removing a column from schema
+ALTER TABLE llap_orders_parquet REPLACE COLUMNS (quantity int, itemid int, region string, state string, ordertime timestamp, city string);
+INSERT INTO llap_orders_parquet VALUES
+(88, 5, 'EU', 'FR', timestamp('2006-02-04 19:55:46.129'), 'Paris');
+SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state;
+
+
+--some more projections
+SELECT o.city, i.name, min(i.cost), max(to60), sum(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 and ordertime > timestamp('2010-01-01') GROUP BY o.city, i.name;
+SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description;
+
+---------------------------------------------
+--Test migrated partitioned table gets cached
+
+CREATE EXTERNAL TABLE mig_source_parquet (id int) partitioned by (region string) stored AS PARQUET;
+INSERT INTO mig_source_parquet VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US');
+ALTER TABLE mig_source_parquet SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');
+
+-- Should miss, but fill cache
+SELECT region, SUM(id) from mig_source_parquet GROUP BY region;
+
+-- Should hit cache
+set hive.llap.io.cache.only=true;
+SELECT region, SUM(id) from mig_source_parquet GROUP BY region;
+set hive.llap.io.cache.only=false;
diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_parquet.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_parquet.q.out
new file mode 100644
index 00000000000..a7ae1a2c34d
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_parquet.q.out
@@ -0,0 +1,465 @@
+PREHOOK: query: DROP TABLE IF EXISTS llap_orders_parquet PURGE
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS llap_orders_parquet PURGE
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS llap_items_parquet PURGE
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS llap_items_parquet PURGE
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: DROP TABLE IF EXISTS mig_source_parquet PURGE
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE IF EXISTS mig_source_parquet PURGE
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE EXTERNAL TABLE llap_items_parquet (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: CREATE EXTERNAL TABLE llap_items_parquet (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: INSERT INTO llap_items_parquet VALUES
+(0, 35000, 'Sedan', 'Model 3', 'Standard range plus'),
+(1, 45000, 'Sedan', 'Model 3', 'Long range'),
+(2, 50000, 'Sedan', 'Model 3', 'Performance'),
+(3, 48000, 'Crossover', 'Model Y', 'Long range'),
+(4, 55000, 'Crossover', 'Model Y', 'Performance'),
+(5, 83000, 'Sports', 'Model S', 'Long range'),
+(6, 123000, 'Sports', 'Model S', 'Plaid')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: INSERT INTO llap_items_parquet VALUES
+(0, 35000, 'Sedan', 'Model 3', 'Standard range plus'),
+(1, 45000, 'Sedan', 'Model 3', 'Long range'),
+(2, 50000, 'Sedan', 'Model 3', 'Performance'),
+(3, 48000, 'Crossover', 'Model Y', 'Long range'),
+(4, 55000, 'Crossover', 'Model Y', 'Performance'),
+(5, 83000, 'Sports', 'Model S', 'Long range'),
+(6, 123000, 'Sports', 'Model S', 'Plaid')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: CREATE EXTERNAL TABLE llap_orders_parquet (orderid INT, quantity INT, itemid INT, tradets TIMESTAMP) PARTITIONED BY (p1 STRING, p2 STRING) STORED BY ICEBERG STORED AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: CREATE EXTERNAL TABLE llap_orders_parquet (orderid INT, quantity INT, itemid INT, tradets TIMESTAMP) PARTITIONED BY (p1 STRING, p2 STRING) STORED BY ICEBERG STORED AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(0, 48, 5, timestamp('2000-06-04 19:55:46.129'), 'EU', 'DE'),
+(1, 12, 6, timestamp('2007-06-24 19:23:22.829'), 'US', 'TX'),
+(2, 76, 4, timestamp('2018-02-19 23:43:51.995'), 'EU', 'DE'),
+(3, 91, 5, timestamp('2000-07-15 09:09:11.587'), 'US', 'NJ'),
+(4, 18, 6, timestamp('2007-12-02 22:30:39.302'), 'EU', 'ES'),
+(5, 71, 5, timestamp('2010-02-08 20:31:23.430'), 'EU', 'DE'),
+(6, 78, 3, timestamp('2016-02-22 20:37:37.025'), 'EU', 'FR'),
+(7, 88, 0, timestamp('2020-03-26 18:47:40.611'), 'EU', 'FR'),
+(8, 87, 4, timestamp('2003-02-20 00:48:09.139'), 'EU', 'ES'),
+(9, 60, 6, timestamp('2012-08-28 01:35:54.283'), 'EU', 'IT'),
+(10, 24, 5, timestamp('2015-03-28 18:57:50.069'), 'US', 'NY'),
+(11, 42, 2, timestamp('2012-06-27 01:13:32.350'), 'EU', 'UK'),
+(12, 37, 4, timestamp('2020-08-09 01:18:50.153'), 'US', 'NY'),
+(13, 52, 1, timestamp('2019-09-04 01:46:19.558'), 'EU', 'UK'),
+(14, 96, 3, timestamp('2019-03-05 22:00:03.020'), 'US', 'NJ'),
+(15, 18, 3, timestamp('2001-09-11 00:14:12.687'), 'EU', 'FR'),
+(16, 46, 0, timestamp('2013-08-31 02:16:17.878'), 'EU', 'UK'),
+(17, 26, 5, timestamp('2001-02-01 20:05:32.317'), 'EU', 'FR'),
+(18, 68, 5, timestamp('2009-12-29 08:44:08.048'), 'EU', 'ES'),
+(19, 54, 6, timestamp('2015-08-15 01:59:22.177'), 'EU', 'HU'),
+(20, 10, 0, timestamp('2018-05-06 12:56:12.789'), 'US', 'CA')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(0, 48, 5, timestamp('2000-06-04 19:55:46.129'), 'EU', 'DE'),
+(1, 12, 6, timestamp('2007-06-24 19:23:22.829'), 'US', 'TX'),
+(2, 76, 4, timestamp('2018-02-19 23:43:51.995'), 'EU', 'DE'),
+(3, 91, 5, timestamp('2000-07-15 09:09:11.587'), 'US', 'NJ'),
+(4, 18, 6, timestamp('2007-12-02 22:30:39.302'), 'EU', 'ES'),
+(5, 71, 5, timestamp('2010-02-08 20:31:23.430'), 'EU', 'DE'),
+(6, 78, 3, timestamp('2016-02-22 20:37:37.025'), 'EU', 'FR'),
+(7, 88, 0, timestamp('2020-03-26 18:47:40.611'), 'EU', 'FR'),
+(8, 87, 4, timestamp('2003-02-20 00:48:09.139'), 'EU', 'ES'),
+(9, 60, 6, timestamp('2012-08-28 01:35:54.283'), 'EU', 'IT'),
+(10, 24, 5, timestamp('2015-03-28 18:57:50.069'), 'US', 'NY'),
+(11, 42, 2, timestamp('2012-06-27 01:13:32.350'), 'EU', 'UK'),
+(12, 37, 4, timestamp('2020-08-09 01:18:50.153'), 'US', 'NY'),
+(13, 52, 1, timestamp('2019-09-04 01:46:19.558'), 'EU', 'UK'),
+(14, 96, 3, timestamp('2019-03-05 22:00:03.020'), 'US', 'NJ'),
+(15, 18, 3, timestamp('2001-09-11 00:14:12.687'), 'EU', 'FR'),
+(16, 46, 0, timestamp('2013-08-31 02:16:17.878'), 'EU', 'UK'),
+(17, 26, 5, timestamp('2001-02-01 20:05:32.317'), 'EU', 'FR'),
+(18, 68, 5, timestamp('2009-12-29 08:44:08.048'), 'EU', 'ES'),
+(19, 54, 6, timestamp('2015-08-15 01:59:22.177'), 'EU', 'HU'),
+(20, 10, 0, timestamp('2018-05-06 12:56:12.789'), 'US', 'CA')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.price >= 50000 GROUP BY i.name, i.description
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.price >= 50000 GROUP BY i.name, i.description
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+Model 3 Performance 42
+Model S Long range 213
+Model S Plaid 132
+Model Y Performance 163
+PREHOOK: query: ALTER TABLE llap_items_parquet CHANGE category cat string AFTER description
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: ALTER TABLE llap_items_parquet CHANGE category cat string AFTER description
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: ALTER TABLE llap_items_parquet CHANGE price cost int AFTER name
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: ALTER TABLE llap_items_parquet CHANGE price cost int AFTER name
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.cost >= 100000 GROUP BY i.name, i.description
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE p1 = 'EU' and i.cost >= 100000 GROUP BY i.name, i.description
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+Model S Plaid 132
+PREHOOK: query: ALTER TABLE llap_items_parquet ADD COLUMNS (to60 float)
+PREHOOK: type: ALTERTABLE_ADDCOLS
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: ALTER TABLE llap_items_parquet ADD COLUMNS (to60 float)
+POSTHOOK: type: ALTERTABLE_ADDCOLS
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: INSERT INTO llap_items_parquet VALUES
+(7, 'Model X', 93000, 'Long range', 'SUV', 3.8),
+(7, 'Model X', 113000, 'Plaid', 'SUV', 2.5)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: INSERT INTO llap_items_parquet VALUES
+(7, 'Model X', 93000, 'Long range', 'SUV', 3.8),
+(7, 'Model X', 113000, 'Plaid', 'SUV', 2.5)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: SELECT cat, min(to60) from llap_items_parquet group by cat
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT cat, min(to60) from llap_items_parquet group by cat
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+#### A masked pattern was here ####
+Crossover NULL
+SUV 2.5
+Sedan NULL
+Sports NULL
+PREHOOK: query: ALTER TABLE llap_items_parquet REPLACE COLUMNS (itemid int, name string, cost int, description string, to60 float)
+PREHOOK: type: ALTERTABLE_REPLACECOLS
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: ALTER TABLE llap_items_parquet REPLACE COLUMNS (itemid int, name string, cost int, description string, to60 float)
+POSTHOOK: type: ALTERTABLE_REPLACECOLS
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: INSERT INTO llap_items_parquet VALUES
+(8, 'Cybertruck', 40000, 'Single Motor RWD', 6.5),
+(9, 'Cybertruck', 50000, 'Dual Motor AWD', 4.5)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_items_parquet
+POSTHOOK: query: INSERT INTO llap_items_parquet VALUES
+(8, 'Cybertruck', 40000, 'Single Motor RWD', 6.5),
+(9, 'Cybertruck', 50000, 'Dual Motor AWD', 4.5)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_items_parquet
+PREHOOK: query: SELECT name, min(to60), max(cost) FROM llap_items_parquet WHERE itemid > 3 GROUP BY name
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT name, min(to60), max(cost) FROM llap_items_parquet WHERE itemid > 3 GROUP BY name
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+#### A masked pattern was here ####
+Cybertruck 4.5 50000
+Model S NULL 123000
+Model X 2.5 113000
+Model Y NULL 55000
+PREHOOK: query: ALTER TABLE llap_orders_parquet CHANGE tradets ordertime timestamp AFTER p2
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@llap_orders_parquet
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet CHANGE tradets ordertime timestamp AFTER p2
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p1 region string
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@llap_orders_parquet
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p1 region string
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(21, 21, 8, 'EU', 'HU', timestamp('2000-01-04 19:55:46.129'))
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(21, 21, 8, 'EU', 'HU', timestamp('2000-01-04 19:55:46.129'))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT region, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT region, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+EU 2000-01-04 19:55:46.129 153
+US 2007-06-24 19:23:22.829 12
+PREHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p2 state string
+PREHOOK: type: ALTERTABLE_RENAMECOL
+PREHOOK: Input: default@llap_orders_parquet
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet CHANGE p2 state string
+POSTHOOK: type: ALTERTABLE_RENAMECOL
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT region, state, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region, state
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT region, state, min(ordertime), sum(quantity) FROM llap_orders_parquet WHERE itemid > 5 GROUP BY region, state
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+EU ES 2007-12-02 22:30:39.302 18
+EU HU 2000-01-04 19:55:46.129 75
+EU IT 2012-08-28 01:35:54.283 60
+US TX 2007-06-24 19:23:22.829 12
+PREHOOK: query: ALTER TABLE llap_orders_parquet ADD COLUMNS (city string)
+PREHOOK: type: ALTERTABLE_ADDCOLS
+PREHOOK: Input: default@llap_orders_parquet
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet ADD COLUMNS (city string)
+POSTHOOK: type: ALTERTABLE_ADDCOLS
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(22, 99, 9, 'EU', 'DE', timestamp('2021-01-04 19:55:46.129'), 'München')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(22, 99, 9, 'EU', 'DE', timestamp('2021-01-04 19:55:46.129'), 'München')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT state, max(city) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT state, max(city) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+DE München
+ES NULL
+FR NULL
+HU NULL
+IT NULL
+UK NULL
+PREHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (region, state, city)
+PREHOOK: type: ALTERTABLE_SETPARTSPEC
+PREHOOK: Input: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (region, state, city)
+POSTHOOK: type: ALTERTABLE_SETPARTSPEC
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(23, 89, 6, 'EU', 'IT', timestamp('2021-02-04 19:55:46.129'), 'Venezia')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(23, 89, 6, 'EU', 'IT', timestamp('2021-02-04 19:55:46.129'), 'Venezia')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+DE München 5.75
+ES NULL 5.0
+FR NULL 2.75
+HU NULL 7.0
+IT Venezia 6.0
+UK NULL 1.0
+PREHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (state, city)
+PREHOOK: type: ALTERTABLE_SETPARTSPEC
+PREHOOK: Input: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet SET PARTITION SPEC (state, city)
+POSTHOOK: type: ALTERTABLE_SETPARTSPEC
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(24, 88, 5, 'EU', 'UK', timestamp('2006-02-04 19:55:46.129'), 'London')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(24, 88, 5, 'EU', 'UK', timestamp('2006-02-04 19:55:46.129'), 'London')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+DE München 5.75
+ES NULL 5.0
+FR NULL 2.75
+HU NULL 7.0
+IT Venezia 6.0
+UK London 2.0
+PREHOOK: query: ALTER TABLE llap_orders_parquet REPLACE COLUMNS (quantity int, itemid int, region string, state string, ordertime timestamp, city string)
+PREHOOK: type: ALTERTABLE_REPLACECOLS
+PREHOOK: Input: default@llap_orders_parquet
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: ALTER TABLE llap_orders_parquet REPLACE COLUMNS (quantity int, itemid int, region string, state string, ordertime timestamp, city string)
+POSTHOOK: type: ALTERTABLE_REPLACECOLS
+POSTHOOK: Input: default@llap_orders_parquet
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(88, 5, 'EU', 'FR', timestamp('2006-02-04 19:55:46.129'), 'Paris')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@llap_orders_parquet
+POSTHOOK: query: INSERT INTO llap_orders_parquet VALUES
+(88, 5, 'EU', 'FR', timestamp('2006-02-04 19:55:46.129'), 'Paris')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@llap_orders_parquet
+PREHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT state, max(city), avg(itemid) from llap_orders_parquet WHERE region = 'EU' GROUP BY state
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+DE München 5.75
+ES NULL 5.0
+FR Paris 3.2
+HU NULL 7.0
+IT Venezia 6.0
+UK London 2.0
+PREHOOK: query: SELECT o.city, i.name, min(i.cost), max(to60), sum(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 and ordertime > timestamp('2010-01-01') GROUP BY o.city, i.name
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT o.city, i.name, min(i.cost), max(to60), sum(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 and ordertime > timestamp('2010-01-01') GROUP BY o.city, i.name
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+München Cybertruck 50000 4.5 99
+NULL Model 3 50000 NULL 42
+Venezia Model S 123000 NULL 89
+NULL Model S 83000 NULL 185
+NULL Model Y 55000 NULL 76
+PREHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description
+PREHOOK: type: QUERY
+PREHOOK: Input: default@llap_items_parquet
+PREHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items_parquet i JOIN llap_orders_parquet o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@llap_items_parquet
+POSTHOOK: Input: default@llap_orders_parquet
+#### A masked pattern was here ####
+Cybertruck Dual Motor AWD 99
+Model 3 Performance 42
+Model S Long range 389
+Model S Plaid 221
+Model Y Performance 163
+PREHOOK: query: CREATE EXTERNAL TABLE mig_source_parquet (id int) partitioned by (region string) stored AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@mig_source_parquet
+POSTHOOK: query: CREATE EXTERNAL TABLE mig_source_parquet (id int) partitioned by (region string) stored AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@mig_source_parquet
+PREHOOK: query: INSERT INTO mig_source_parquet VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@mig_source_parquet
+POSTHOOK: query: INSERT INTO mig_source_parquet VALUES (1, 'EU'), (1, 'US'), (2, 'EU'), (3, 'EU'), (2, 'US')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@mig_source_parquet
+POSTHOOK: Output: default@mig_source_parquet@region=EU
+POSTHOOK: Output: default@mig_source_parquet@region=US
+POSTHOOK: Lineage: mig_source_parquet PARTITION(region=EU).id SCRIPT []
+POSTHOOK: Lineage: mig_source_parquet PARTITION(region=US).id SCRIPT []
+PREHOOK: query: ALTER TABLE mig_source_parquet SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler')
+PREHOOK: type: ALTERTABLE_PROPERTIES
+PREHOOK: Input: default@mig_source_parquet
+PREHOOK: Output: default@mig_source_parquet
+POSTHOOK: query: ALTER TABLE mig_source_parquet SET TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler')
+POSTHOOK: type: ALTERTABLE_PROPERTIES
+POSTHOOK: Input: default@mig_source_parquet
+POSTHOOK: Output: default@mig_source_parquet
+PREHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region
+PREHOOK: type: QUERY
+PREHOOK: Input: default@mig_source_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@mig_source_parquet
+#### A masked pattern was here ####
+EU 6
+US 3
+PREHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region
+PREHOOK: type: QUERY
+PREHOOK: Input: default@mig_source_parquet
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT region, SUM(id) from mig_source_parquet GROUP BY region
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@mig_source_parquet
+#### A masked pattern was here ####
+EU 6
+US 3
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index b32ad20d577..0b589fe4d36 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -404,9 +404,11 @@ erasurecoding.only.query.files=\
iceberg.llap.query.files=\
llap_iceberg_read_orc.q,\
+ llap_iceberg_read_parquet.q,\
vectorized_iceberg_read_mixed.q,\
vectorized_iceberg_read_orc.q,\
vectorized_iceberg_read_parquet.q
iceberg.llap.only.query.files=\
- llap_iceberg_read_orc.q
+ llap_iceberg_read_orc.q,\
+ llap_iceberg_read_parquet.q
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
index a4fc13a0ee0..c650288a93a 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.Deserializer;
@@ -62,6 +64,21 @@ public interface LlapIo<T> {
*/
OrcTail getOrcTailFromCache(Path path, Configuration conf, CacheTag tag, @Nullable Object fileKey) throws IOException;
+
+ /**
+ * Returns the metadata buffers associated with the Parquet file on the given path.
+ * Content is either obtained from cache, or from disk if there is a cache miss.
+ * @param path Parquet file path
+ * @param conf jobConf
+ * @param fileKey fileId of the Parquet file (either the Long fileId of HDFS or the SyntheticFileId).
+ * Optional, if it is not provided, it will be generated, see:
+ * org.apache.hadoop.hive.ql.io.HdfsUtils#getFileId()
+ * @return
+ * @throws IOException
+ */
+ MemoryBufferOrBuffers getParquetFooterBuffersFromCache(Path path, JobConf conf, @Nullable Object fileKey)
+ throws IOException;
+
/**
* Handles request to evict entities specified in the request object.
* @param protoRequest lists Hive entities (DB, table, etc..) whose LLAP buffers should be evicted.
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index ecf8d2575c9..4634c4639f0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -27,10 +27,14 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import javax.annotation.Nullable;
import javax.management.ObjectName;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
import org.apache.hadoop.hive.llap.ProactiveEviction;
import org.apache.hadoop.hive.llap.cache.LlapCacheHydration;
import org.apache.hadoop.hive.llap.cache.MemoryLimitedPathCache;
@@ -82,6 +86,8 @@ import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.vector.ParquetFooterInputFromCache;
+import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.io.NullWritable;
@@ -93,12 +99,19 @@ import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hive.common.util.FixedSizedObjectPool;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.orc.impl.OrcTail;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
+
public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
public static final Logger LOG = LoggerFactory.getLogger("LlapIoImpl");
public static final Logger ORC_LOGGER = LoggerFactory.getLogger("LlapIoOrc");
@@ -455,6 +468,50 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
}
}
+ @Override
+ public MemoryBufferOrBuffers getParquetFooterBuffersFromCache(Path path, JobConf conf, @Nullable Object fileKey)
+ throws IOException {
+
+ Preconditions.checkNotNull(fileMetadataCache, "Metadata cache must not be null");
+
+ boolean isReadCacheOnly = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_CACHE_ONLY);
+ CacheTag tag = VectorizedParquetRecordReader.cacheTagOfParquetFile(path, daemonConf, conf);
+
+ MemoryBufferOrBuffers footerData = (fileKey == null ) ? null
+ : fileMetadataCache.getFileMetadata(fileKey);
+ if (footerData != null) {
+ LOG.info("Found the footer in cache for " + fileKey);
+ try {
+ return footerData;
+ } finally {
+ fileMetadataCache.decRefBuffer(footerData);
+ }
+ } else {
+ throwIfCacheOnlyRead(isReadCacheOnly);
+ }
+
+ final FileSystem fs = path.getFileSystem(conf);
+ final FileStatus stat = fs.getFileStatus(path);
+
+ // To avoid reading the footer twice, we will cache it first and then read from cache.
+ // Parquet calls protobuf methods directly on the stream and we can't get bytes after the fact.
+ try (SeekableInputStream stream = HadoopStreams.wrap(fs.open(path))) {
+ long footerLengthIndex = stat.getLen()
+ - ParquetFooterInputFromCache.FOOTER_LENGTH_SIZE - ParquetFileWriter.MAGIC.length;
+ stream.seek(footerLengthIndex);
+ int footerLength = BytesUtils.readIntLittleEndian(stream);
+ stream.seek(footerLengthIndex - footerLength);
+ LOG.info("Caching the footer of length " + footerLength + " for " + fileKey);
+ // Note: we don't pass in isStopped here - this is not on an IO thread.
+ footerData = fileMetadataCache.putFileMetadata(fileKey, footerLength, stream, tag, null);
+ try {
+ return footerData;
+ } finally {
+ fileMetadataCache.decRefBuffer(footerData);
+ }
+ }
+ }
+
@Override
public LlapDaemonProtocolProtos.CacheEntryList fetchCachedContentInfo() {
if (useLowLevelCache) {
@@ -479,4 +536,5 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
LOG.warn("Cannot load data into the cache. Low level cache is disabled.");
}
}
+
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java
index e4e38392b75..1fe56c08dd6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/LlapOrcCacheLoader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.FileMetadataCache;
import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
@@ -42,7 +43,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Supplier;
-import static org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.determineFileId;
import static org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.getFsSupplier;
/**
@@ -77,7 +77,7 @@ public class LlapOrcCacheLoader implements AutoCloseable {
public void init() throws IOException {
fsSupplier = getFsSupplier(path, daemonConf);
- Object fileKey = determineFileId(fsSupplier, path, daemonConf);
+ Object fileKey = LlapHiveUtils.createFileIdUsingFS(fsSupplier.get(), path, daemonConf);
if(!fileKey.equals(this.fileKey)) {
throw new IOException("File key mismatch.");
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index aa03bde34e5..7d840599c9a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -499,8 +499,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
return true;
}
- private static Object determineFileId(Supplier<FileSystem> fsSupplier,
- FileSplit split, Configuration daemonConf) throws IOException {
+ private static Object determineFileId(Supplier<FileSystem> fsSupplier, FileSplit split, Configuration daemonConf)
+ throws IOException {
if (split instanceof OrcSplit) {
Object fileKey = ((OrcSplit)split).getFileKey();
@@ -509,17 +509,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
}
}
LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
- return determineFileId(fsSupplier, split.getPath(), daemonConf);
- }
-
- static Object determineFileId(Supplier<FileSystem> fsSupplier, Path path, Configuration daemonConf)
- throws IOException {
-
- boolean allowSynthetic = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID);
- boolean checkDefaultFs = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID);
- boolean forceSynthetic = !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH);
-
- return HdfsUtils.getFileId(fsSupplier.get(), path, allowSynthetic, checkDefaultFs, forceSynthetic);
+ return LlapHiveUtils.createFileIdUsingFS(fsSupplier.get(), split.getPath(), daemonConf);
}
/**
@@ -590,7 +580,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
Configuration daemonConf, MetadataCache metadataCache, Object fileKey) throws IOException {
Supplier<FileSystem> fsSupplier = getFsSupplier(path, jobConf);
if (fileKey == null) {
- fileKey = determineFileId(fsSupplier, path, daemonConf);
+ fileKey = LlapHiveUtils.createFileIdUsingFS(fsSupplier.get(), path, daemonConf);
}
if(fileKey == null || metadataCache == null) {
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index f9058ef7956..40b745e09ff 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.hive.llap.io.encoded.VectorDeserializeOrcWriter.AsyncCa
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
@@ -224,10 +223,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
fs = split.getPath().getFileSystem(daemonConf);
PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
- fileKey = determineCacheKey(fs, split, partitionDesc,
- HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
- HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
- !HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_USE_FILEID_PATH));
+ fileKey = determineCacheKey(fs, split, partitionDesc, daemonConf);
cacheTag = HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_TRACK_CACHE_USAGE)
? LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), true, partitionDesc) : null;
this.sourceInputFormat = sourceInputFormat;
@@ -1733,12 +1729,13 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
private static Object determineCacheKey(FileSystem fs, FileSplit split, PartitionDesc partitionDesc,
- boolean allowSynthetic, boolean checkDefaultFs, boolean forceSynthetic) throws IOException {
+ Configuration daemonConf)
+ throws IOException {
/* TODO: support this optionally? this is not OrcSplit, but we could add a custom split.
Object fileKey = ((OrcSplit)split).getFileKey();
if (fileKey != null) return fileKey; */
LlapIoImpl.LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
- Object fileId = HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic, checkDefaultFs, forceSynthetic);
+ Object fileId = LlapHiveUtils.createFileIdUsingFS(fs, split.getPath(), daemonConf);
return SchemaAwareCacheKey.buildCacheKey(fileId, LlapHiveUtils.getSchemaHash(partitionDesc));
}
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
index 52126d971c1..ba62b8d89c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapHiveUtils.java
@@ -18,16 +18,18 @@
package org.apache.hadoop.hive.llap;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -145,4 +147,30 @@ public final class LlapHiveUtils {
return "llap".equalsIgnoreCase(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
}
+ /**
+ * Determines the fileID for the given path using the FileSystem type provided while considering daemon configuration.
+ * Invokes HdfsUtils.getFileId(), the resulting file ID can be of types Long (inode) or SyntheticFileId depending
+ * on the FS type and the actual daemon configuration.
+ * Can be costly on cloud file systems.
+ * @param fs FileSystem type
+ * @param path Path associated to this file
+ * @param daemonConf Llap daemon configuration
+ * @return the generated fileID, can be null in special cases (e.g. conf disallows synthetic ID on a non-HDFS FS)
+ * @throws IOException
+ */
+ public static Object createFileIdUsingFS(FileSystem fs, Path path, Configuration daemonConf) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Will invoke HdfsUtils.getFileId - this is costly on cloud file systems. " +
+ "Turn on TRACE level logging to show call trace.");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(Arrays.deepToString(Thread.currentThread().getStackTrace()));
+ }
+ }
+ boolean allowSynthetic = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID);
+ boolean checkDefaultFs = HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID);
+ boolean forceSynthetic = !HiveConf.getBoolVar(daemonConf, HiveConf.ConfVars.LLAP_IO_USE_FILEID_PATH);
+
+ return HdfsUtils.getFileId(fs, path, allowSynthetic, checkDefaultFs, forceSynthetic);
+ }
+
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
index dc79e1076b0..56df1547910 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
@@ -21,12 +21,19 @@ package org.apache.hadoop.hive.ql.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
public final class SyntheticFileId implements Writable {
+
+ private static final String JOBCONF_KEY = "SYNTHETIC_FILE_ID";
+ private static final Pattern STRING_PATTERN = Pattern.compile("\\[(-?\\d+),\\s(-?\\d+),\\s(\\d+)\\]");
+
private long pathHash;
private long modTime;
private long length;
@@ -41,6 +48,16 @@ public final class SyntheticFileId implements Writable {
this.length = len;
}
+ private SyntheticFileId(String fileIdAsString) {
+ Matcher matcher = STRING_PATTERN.matcher(fileIdAsString);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Expected format " + STRING_PATTERN + " but got " + fileIdAsString);
+ }
+ this.pathHash = Long.parseLong(matcher.group(1));
+ this.modTime = Long.parseLong(matcher.group(2));
+ this.length = Long.parseLong(matcher.group(3));
+ }
+
public SyntheticFileId(FileStatus file) {
this(file.getPath(), file.getLen(), file.getModificationTime());
}
@@ -109,4 +126,16 @@ public final class SyntheticFileId implements Writable {
public long getLength() {
return length;
}
+
+ public void toJobConf(JobConf job) {
+ job.set(JOBCONF_KEY, this.toString());
+ }
+
+ public static SyntheticFileId fromJobConf(JobConf job) {
+ String idAsString = job.get(JOBCONF_KEY);
+ if (idAsString == null) {
+ return null;
+ }
+ return new SyntheticFileId(idAsString);
+ }
}
\ No newline at end of file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
index e2e60670cae..49960caf7aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
@@ -30,9 +30,9 @@ import org.apache.parquet.io.SeekableInputStream;
* read the footer from cache without being aware of the latter.
* This implements both InputFile and the InputStream that the reader gets from InputFile.
*/
-final class ParquetFooterInputFromCache
+public final class ParquetFooterInputFromCache
extends SeekableInputStream implements InputFile {
- final static int FOOTER_LENGTH_SIZE = 4; // For the file size check.
+ public final static int FOOTER_LENGTH_SIZE = 4; // For the file size check.
private static final int TAIL_LENGTH = ParquetFileWriter.MAGIC.length + FOOTER_LENGTH_SIZE;
private static final int FAKE_PREFIX_LENGTH = ParquetFileWriter.MAGIC.length;
private final int length, footerLength;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index e0e14863dfd..f7b13cb3d6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -30,12 +30,14 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapCacheAwareFs;
import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.BucketIdentifier;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -52,12 +54,10 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.parquet.ParquetRuntimeException;
-import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetInputSplit;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -83,7 +83,6 @@ import java.util.List;
import java.util.Set;
import java.util.TreeMap;
-import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
/**
@@ -104,7 +103,6 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
private Object[] partitionValues;
private Path cacheFsPath;
private static final int MAP_DEFINITION_LEVEL_MAX = 3;
- private final boolean isReadCacheOnly;
/**
* For each request column, the reader to read this column. This is NULL if this column
@@ -150,11 +148,11 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
this.cacheConf = cacheConf;
if (metadataCache != null) {
- cacheKey = HdfsUtils.getFileId(filePath.getFileSystem(conf), filePath,
- HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
- HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
- !HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH));
- // HdfsUtils.getFileId might yield to null in certain configurations
+ cacheKey = SyntheticFileId.fromJobConf(conf);
+ if (cacheKey == null) {
+ cacheKey = LlapHiveUtils.createFileIdUsingFS(filePath.getFileSystem(conf), filePath, cacheConf);
+ }
+ // createFileIdUsingFS() might yield to null in certain configurations
if (cacheKey != null) {
cacheTag = cacheTagOfParquetFile(filePath, cacheConf, conf);
// If we are going to use cache, change the path to depend on file ID for extra consistency.
@@ -169,7 +167,6 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
//initialize the rowbatchContext
- isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY);
rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf);
if (parquetInputSplit != null) {
@@ -285,40 +282,14 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
private ParquetMetadata readSplitFooter(JobConf configuration, final Path file,
Object cacheKey, MetadataFilter filter, CacheTag tag) throws IOException {
- MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null
- : metadataCache.getFileMetadata(cacheKey);
- if (footerData != null) {
- LOG.info("Found the footer in cache for " + cacheKey);
- try {
- return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
- } finally {
- metadataCache.decRefBuffer(footerData);
- }
- } else {
- throwIfCacheOnlyRead(isReadCacheOnly);
- }
- final FileSystem fs = file.getFileSystem(configuration);
- final FileStatus stat = fs.getFileStatus(file);
if (cacheKey == null || metadataCache == null) {
+ // Non-LLAP case
+ FileSystem fs = file.getFileSystem(configuration);
+ FileStatus stat = fs.getFileStatus(file);
return readFooterFromFile(file, fs, stat, filter);
- }
-
- // To avoid reading the footer twice, we will cache it first and then read from cache.
- // Parquet calls protobuf methods directly on the stream and we can't get bytes after the fact.
- try (SeekableInputStream stream = HadoopStreams.wrap(fs.open(file))) {
- long footerLengthIndex = stat.getLen()
- - ParquetFooterInputFromCache.FOOTER_LENGTH_SIZE - ParquetFileWriter.MAGIC.length;
- stream.seek(footerLengthIndex);
- int footerLength = BytesUtils.readIntLittleEndian(stream);
- stream.seek(footerLengthIndex - footerLength);
- LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey);
- // Note: we don't pass in isStopped here - this is not on an IO thread.
- footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream, tag, null);
- try {
- return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
- } finally {
- metadataCache.decRefBuffer(footerData);
- }
+ } else {
+ MemoryBufferOrBuffers footerData = LlapProxy.getIo().getParquetFooterBuffersFromCache(file, configuration, cacheKey);
+ return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
}
}
@@ -337,7 +308,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
return ParquetFileReader.readFooter(inputFile, filter);
}
- private static CacheTag cacheTagOfParquetFile(Path path, Configuration cacheConf, JobConf jobConf) {
+ public static CacheTag cacheTagOfParquetFile(Path path, Configuration cacheConf, JobConf jobConf) {
MapWork mapWork = LlapHiveUtils.findMapWork(jobConf);
if (!HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE) || mapWork == null) {
return null;