You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/02/24 02:37:33 UTC

[spark] branch master updated: Revert "[SPARK-32703][SQL] Replace deprecated API calls from SpecificParquetRecordReaderBase"

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 80bad08  Revert "[SPARK-32703][SQL] Replace deprecated API calls from SpecificParquetRecordReaderBase"
80bad08 is described below

commit 80bad086c806fd507b1fb197b171f87333f2fb08
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Wed Feb 24 11:36:54 2021 +0900

    Revert "[SPARK-32703][SQL] Replace deprecated API calls from SpecificParquetRecordReaderBase"
    
    This reverts commit 27873280ffbd73be6df230b4497701794ac81d91.
---
 .../parquet/SpecificParquetRecordReaderBase.java   | 93 +++++++++++++++-------
 .../datasources/parquet/ParquetFileFormat.scala    | 10 ++-
 .../v2/parquet/ParquetPartitionReaderFactory.scala | 18 +++--
 3 files changed, 85 insertions(+), 36 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 8d7a294..0c82c03 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,33 +32,36 @@ import java.util.Set;
 
 import scala.Option;
 
+import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.parquet.HadoopReadOptions;
-import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.hadoop.BadConfigurationException;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetInputSplit;
 import org.apache.parquet.hadoop.api.InitContext;
 import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.ConfigurationUtil;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Types;
 import org.apache.spark.TaskContext;
 import org.apache.spark.TaskContext$;
-import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.StructType$;
 import org.apache.spark.util.AccumulatorV2;
@@ -88,16 +92,58 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
   public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
       throws IOException, InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
-    FileSplit split = (FileSplit) inputSplit;
+    ParquetInputSplit split = (ParquetInputSplit)inputSplit;
     this.file = split.getPath();
+    long[] rowGroupOffsets = split.getRowGroupOffsets();
+
+    ParquetMetadata footer;
+    List<BlockMetaData> blocks;
 
-    ParquetReadOptions options = HadoopReadOptions
-      .builder(configuration)
-      .withRange(split.getStart(), split.getStart() + split.getLength())
-      .build();
-    this.reader = new ParquetFileReader(HadoopInputFile.fromPath(file, configuration), options);
-    this.fileSchema = reader.getFileMetaData().getSchema();
-    Map<String, String> fileMetadata = reader.getFileMetaData().getKeyValueMetaData();
+    // if task.side.metadata is set, rowGroupOffsets is null
+    if (rowGroupOffsets == null) {
+      // then we need to apply the predicate push down filter
+      footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
+      MessageType fileSchema = footer.getFileMetaData().getSchema();
+      FilterCompat.Filter filter = getFilter(configuration);
+      blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
+    } else {
+      // SPARK-33532: After SPARK-13883 and SPARK-13989, the parquet read process will
+      // no longer enter this branch because `ParquetInputSplit` only be constructed in
+      // `ParquetFileFormat.buildReaderWithPartitionValues` and
+      // `ParquetPartitionReaderFactory.buildReaderBase` method,
+      // and the `rowGroupOffsets` in `ParquetInputSplit` set to null explicitly.
+      // We didn't delete this branch because PARQUET-131 wanted to move this to the
+      // parquet-mr project.
+      // otherwise we find the row groups that were selected on the client
+      footer = readFooter(configuration, file, NO_FILTER);
+      Set<Long> offsets = new HashSet<>();
+      for (long offset : rowGroupOffsets) {
+        offsets.add(offset);
+      }
+      blocks = new ArrayList<>();
+      for (BlockMetaData block : footer.getBlocks()) {
+        if (offsets.contains(block.getStartingPos())) {
+          blocks.add(block);
+        }
+      }
+      // verify we found them all
+      if (blocks.size() != rowGroupOffsets.length) {
+        long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
+        for (int i = 0; i < foundRowGroupOffsets.length; i++) {
+          foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
+        }
+        // this should never happen.
+        // provide a good error message in case there's a bug
+        throw new IllegalStateException(
+            "All the offsets listed in the split should be found in the file."
+                + " expected: " + Arrays.toString(rowGroupOffsets)
+                + " found: " + blocks
+                + " out of: " + Arrays.toString(foundRowGroupOffsets)
+                + " in range " + split.getStart() + ", " + split.getEnd());
+      }
+    }
+    this.fileSchema = footer.getFileMetaData().getSchema();
+    Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData();
     ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
     ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
         taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
@@ -105,6 +151,8 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     String sparkRequestedSchemaString =
         configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
     this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
+    this.reader = new ParquetFileReader(
+        configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
     this.totalRowCount = reader.getFilteredRecordCount();
 
     // For test purpose.
@@ -117,7 +165,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
       if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
         @SuppressWarnings("unchecked")
         AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get();
-        intAccum.add(reader.getRowGroups().size());
+        intAccum.add(blocks.size());
       }
     }
   }
@@ -151,21 +199,12 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
    */
   protected void initialize(String path, List<String> columns) throws IOException {
     Configuration config = new Configuration();
-    config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
-    config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+    config.set("spark.sql.parquet.binaryAsString", "false");
+    config.set("spark.sql.parquet.int96AsTimestamp", "false");
 
     this.file = new Path(path);
     long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
-    ParquetReadOptions options = HadoopReadOptions
-      .builder(config)
-      .withRange(0, length)
-      .build();
-
-    ParquetMetadata footer;
-    try (ParquetFileReader reader = ParquetFileReader
-        .open(HadoopInputFile.fromPath(file, config), options)) {
-      footer = reader.getFooter();
-    }
+    ParquetMetadata footer = readFooter(config, file, range(0, length));
 
     List<BlockMetaData> blocks = footer.getBlocks();
     this.fileSchema = footer.getFileMetaData().getSchema();
@@ -188,8 +227,6 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
       }
     }
     this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema);
-    // unfortunately we'd have to create the reader again since there is no column projection
-    // in the new API.
     this.reader = new ParquetFileReader(
         config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
     this.totalRowCount = reader.getFilteredRecordCount();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index fa6e124..64a1ac8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -26,7 +26,6 @@ import scala.util.{Failure, Try}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.mapred.FileSplit
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.parquet.filter2.compat.FilterCompat
@@ -260,7 +259,14 @@ class ParquetFileFormat
       assert(file.partitionValues.numFields == partitionSchema.size)
 
       val filePath = new Path(new URI(file.filePath))
-      val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])
+      val split =
+        new org.apache.parquet.hadoop.ParquetInputSplit(
+          filePath,
+          file.start,
+          file.start + file.length,
+          file.length,
+          Array.empty,
+          null)
 
       val sharedConf = broadcastedHadoopConf.value.value
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index af0100c..20d0de4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -20,13 +20,12 @@ import java.net.URI
 import java.time.ZoneId
 
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.FileSplit
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
 import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetInputSplit, ParquetRecordReader}
 
 import org.apache.spark.TaskContext
 import org.apache.spark.broadcast.Broadcast
@@ -122,14 +121,21 @@ case class ParquetPartitionReaderFactory(
   private def buildReaderBase[T](
       file: PartitionedFile,
       buildReaderFunc: (
-        FileSplit, InternalRow, TaskAttemptContextImpl,
+        ParquetInputSplit, InternalRow, TaskAttemptContextImpl,
           Option[FilterPredicate], Option[ZoneId],
           LegacyBehaviorPolicy.Value,
           LegacyBehaviorPolicy.Value) => RecordReader[Void, T]): RecordReader[Void, T] = {
     val conf = broadcastedConf.value.value
 
     val filePath = new Path(new URI(file.filePath))
-    val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])
+    val split =
+      new org.apache.parquet.hadoop.ParquetInputSplit(
+        filePath,
+        file.start,
+        file.start + file.length,
+        file.length,
+        Array.empty,
+        null)
 
     lazy val footerFileMetaData =
       ParquetFileReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData
@@ -193,7 +199,7 @@ case class ParquetPartitionReaderFactory(
   }
 
   private def createRowBaseParquetReader(
-      split: FileSplit,
+      split: ParquetInputSplit,
       partitionValues: InternalRow,
       hadoopAttemptContext: TaskAttemptContextImpl,
       pushed: Option[FilterPredicate],
@@ -228,7 +234,7 @@ case class ParquetPartitionReaderFactory(
   }
 
   private def createParquetVectorizedReader(
-      split: FileSplit,
+      split: ParquetInputSplit,
       partitionValues: InternalRow,
       hadoopAttemptContext: TaskAttemptContextImpl,
       pushed: Option[FilterPredicate],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org