You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2022/03/16 08:10:26 UTC

[hive] branch master updated: Revert "HIVE-25845: Support ColumnIndexes for Parq files (#3094)"

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new efddf73  Revert "HIVE-25845: Support ColumnIndexes for Parq files (#3094)"
efddf73 is described below

commit efddf739ff1ff2962fc25c7e3edd660e67b8963f
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Mon Mar 14 21:48:58 2022 +0100

    Revert "HIVE-25845: Support ColumnIndexes for Parq files (#3094)"
    
    This reverts commit 88054ce553604a6d939149faf4d1c3b037706aba.
---
 pom.xml                                              |  2 +-
 .../hive/ql/io/parquet/ParquetRecordReaderBase.java  |  3 ---
 .../vector/VectorizedParquetRecordReader.java        | 20 +++++++++-----------
 3 files changed, 10 insertions(+), 15 deletions(-)

diff --git a/pom.xml b/pom.xml
index db34aa9..804aadd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -189,7 +189,7 @@
     <!-- used by druid storage handler -->
     <pac4j-saml.version>4.0.3</pac4j-saml.version>
     <paranamer.version>2.8</paranamer.version>
-    <parquet.version>1.11.2</parquet.version>
+    <parquet.version>1.11.1</parquet.version>
     <pig.version>0.16.0</pig.version>
     <plexus.version>1.5.6</plexus.version>
     <protobuf.version>2.5.0</protobuf.version>
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
index 1227d52..5235edc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
@@ -107,19 +107,16 @@ public class ParquetRecordReaderBase {
       final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
       final long splitStart = ((FileSplit) oldSplit).getStart();
       final long splitLength = ((FileSplit) oldSplit).getLength();
-      long blockRowCount = 0;
       for (final BlockMetaData block : blocks) {
         final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
         if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) {
           splitGroup.add(block);
-          blockRowCount += block.getRowCount();
         }
       }
       if (splitGroup.isEmpty()) {
         LOG.warn("Skipping split, could not find row group in: " + oldSplit);
         return null;
       }
-      LOG.debug("split group size: {}, row count: {}", splitGroup.size(), blockRowCount);
 
       FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema());
       if (filter != null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 26fb96c..d17ddd5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -157,13 +157,12 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       jobConf = conf;
       isReadCacheOnly = HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_CACHE_ONLY);
       rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf);
-      ParquetInputSplit inputSplit = getSplit(oldInputSplit, jobConf);
-      // use jobConf consistently throughout, as getSplit clones it & adds filter to it.
+      ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf);
       if (inputSplit != null) {
-        initialize(inputSplit, jobConf);
+        initialize(inputSplit, conf);
       }
       FileSplit fileSplit = (FileSplit) oldInputSplit;
-      initPartitionValues(fileSplit, jobConf);
+      initPartitionValues(fileSplit, conf);
       bucketIdentifier = BucketIdentifier.from(conf, fileSplit.getPath());
     } catch (Throwable e) {
       LOG.error("Failed to create the vectorized reader due to exception " + e);
@@ -271,6 +270,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       }
     }
 
+    for (BlockMetaData block : blocks) {
+      this.totalRowCount += block.getRowCount();
+    }
     this.fileSchema = footer.getFileMetaData().getSchema();
     this.writerTimezone = DataWritableReadSupport
         .getWriterTimeZoneId(footer.getFileMetaData().getKeyValueMetaData());
@@ -279,12 +281,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     requestedSchema = DataWritableReadSupport
       .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration);
 
-    //TODO: For data cache this needs to be fixed and passed to reader.
-    //Path path = wrapPathForCache(file, cacheKey, configuration, blocks, cacheTag);
+    Path path = wrapPathForCache(file, cacheKey, configuration, blocks, cacheTag);
     this.reader = new ParquetFileReader(
-      configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
-    this.totalRowCount = this.reader.getFilteredRecordCount();
-    LOG.debug("totalRowCount: {}", totalRowCount);
+      configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
   }
 
   private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration,
@@ -303,7 +302,6 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     for (BlockMetaData block : blocks) {
       for (ColumnChunkMetaData mc : block.getColumns()) {
         if (!includedCols.contains(mc.getPath())) continue;
-        LOG.info("Patch: chunkIndex: k:" + mc.getStartingPos() + ", val: " + mc.getStartingPos() + mc.getTotalSize());
         chunkIndex.put(mc.getStartingPos(), mc.getStartingPos() + mc.getTotalSize());
       }
     }
@@ -452,7 +450,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     if (rowsReturned != totalCountLoadedSoFar) {
       return;
     }
-    PageReadStore pages = reader.readNextFilteredRowGroup();
+    PageReadStore pages = reader.readNextRowGroup();
     if (pages == null) {
       throw new IOException("expecting more rows but reached last block. Read "
         + rowsReturned + " out of " + totalRowCount);