You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/02/24 02:37:33 UTC
[spark] branch master updated: Revert "[SPARK-32703][SQL] Replace
deprecated API calls from SpecificParquetRecordReaderBase"
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 80bad08 Revert "[SPARK-32703][SQL] Replace deprecated API calls from SpecificParquetRecordReaderBase"
80bad08 is described below
commit 80bad086c806fd507b1fb197b171f87333f2fb08
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Wed Feb 24 11:36:54 2021 +0900
Revert "[SPARK-32703][SQL] Replace deprecated API calls from SpecificParquetRecordReaderBase"
This reverts commit 27873280ffbd73be6df230b4497701794ac81d91.
---
.../parquet/SpecificParquetRecordReaderBase.java | 93 +++++++++++++++-------
.../datasources/parquet/ParquetFileFormat.scala | 10 ++-
.../v2/parquet/ParquetPartitionReaderFactory.scala | 18 +++--
3 files changed, 85 insertions(+), 36 deletions(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 8d7a294..0c82c03 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -31,33 +32,36 @@ import java.util.Set;
import scala.Option;
+import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.parquet.HadoopReadOptions;
-import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.BadConfigurationException;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetInputSplit;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
-import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.util.AccumulatorV2;
@@ -88,16 +92,58 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
- FileSplit split = (FileSplit) inputSplit;
+ ParquetInputSplit split = (ParquetInputSplit)inputSplit;
this.file = split.getPath();
+ long[] rowGroupOffsets = split.getRowGroupOffsets();
+
+ ParquetMetadata footer;
+ List<BlockMetaData> blocks;
- ParquetReadOptions options = HadoopReadOptions
- .builder(configuration)
- .withRange(split.getStart(), split.getStart() + split.getLength())
- .build();
- this.reader = new ParquetFileReader(HadoopInputFile.fromPath(file, configuration), options);
- this.fileSchema = reader.getFileMetaData().getSchema();
- Map<String, String> fileMetadata = reader.getFileMetaData().getKeyValueMetaData();
+ // if task.side.metadata is set, rowGroupOffsets is null
+ if (rowGroupOffsets == null) {
+ // then we need to apply the predicate push down filter
+ footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
+ MessageType fileSchema = footer.getFileMetaData().getSchema();
+ FilterCompat.Filter filter = getFilter(configuration);
+ blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
+ } else {
+ // SPARK-33532: After SPARK-13883 and SPARK-13989, the parquet read process will
+ // no longer enter this branch because `ParquetInputSplit` only be constructed in
+ // `ParquetFileFormat.buildReaderWithPartitionValues` and
+ // `ParquetPartitionReaderFactory.buildReaderBase` method,
+ // and the `rowGroupOffsets` in `ParquetInputSplit` set to null explicitly.
+ // We didn't delete this branch because PARQUET-131 wanted to move this to the
+ // parquet-mr project.
+ // otherwise we find the row groups that were selected on the client
+ footer = readFooter(configuration, file, NO_FILTER);
+ Set<Long> offsets = new HashSet<>();
+ for (long offset : rowGroupOffsets) {
+ offsets.add(offset);
+ }
+ blocks = new ArrayList<>();
+ for (BlockMetaData block : footer.getBlocks()) {
+ if (offsets.contains(block.getStartingPos())) {
+ blocks.add(block);
+ }
+ }
+ // verify we found them all
+ if (blocks.size() != rowGroupOffsets.length) {
+ long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
+ for (int i = 0; i < foundRowGroupOffsets.length; i++) {
+ foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
+ }
+ // this should never happen.
+ // provide a good error message in case there's a bug
+ throw new IllegalStateException(
+ "All the offsets listed in the split should be found in the file."
+ + " expected: " + Arrays.toString(rowGroupOffsets)
+ + " found: " + blocks
+ + " out of: " + Arrays.toString(foundRowGroupOffsets)
+ + " in range " + split.getStart() + ", " + split.getEnd());
+ }
+ }
+ this.fileSchema = footer.getFileMetaData().getSchema();
+ Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData();
ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
@@ -105,6 +151,8 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
String sparkRequestedSchemaString =
configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
+ this.reader = new ParquetFileReader(
+ configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
this.totalRowCount = reader.getFilteredRecordCount();
// For test purpose.
@@ -117,7 +165,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
@SuppressWarnings("unchecked")
AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get();
- intAccum.add(reader.getRowGroups().size());
+ intAccum.add(blocks.size());
}
}
}
@@ -151,21 +199,12 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
*/
protected void initialize(String path, List<String> columns) throws IOException {
Configuration config = new Configuration();
- config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
- config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
+ config.set("spark.sql.parquet.binaryAsString", "false");
+ config.set("spark.sql.parquet.int96AsTimestamp", "false");
this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
- ParquetReadOptions options = HadoopReadOptions
- .builder(config)
- .withRange(0, length)
- .build();
-
- ParquetMetadata footer;
- try (ParquetFileReader reader = ParquetFileReader
- .open(HadoopInputFile.fromPath(file, config), options)) {
- footer = reader.getFooter();
- }
+ ParquetMetadata footer = readFooter(config, file, range(0, length));
List<BlockMetaData> blocks = footer.getBlocks();
this.fileSchema = footer.getFileMetaData().getSchema();
@@ -188,8 +227,6 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
}
}
this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema);
- // unfortunately we'd have to create the reader again since there is no column projection
- // in the new API.
this.reader = new ParquetFileReader(
config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
this.totalRowCount = reader.getFilteredRecordCount();
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index fa6e124..64a1ac8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -26,7 +26,6 @@ import scala.util.{Failure, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.filter2.compat.FilterCompat
@@ -260,7 +259,14 @@ class ParquetFileFormat
assert(file.partitionValues.numFields == partitionSchema.size)
val filePath = new Path(new URI(file.filePath))
- val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])
+ val split =
+ new org.apache.parquet.hadoop.ParquetInputSplit(
+ filePath,
+ file.start,
+ file.start + file.length,
+ file.length,
+ Array.empty,
+ null)
val sharedConf = broadcastedHadoopConf.value.value
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index af0100c..20d0de4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -20,13 +20,12 @@ import java.net.URI
import java.time.ZoneId
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate}
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
-import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetRecordReader}
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetInputSplit, ParquetRecordReader}
import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
@@ -122,14 +121,21 @@ case class ParquetPartitionReaderFactory(
private def buildReaderBase[T](
file: PartitionedFile,
buildReaderFunc: (
- FileSplit, InternalRow, TaskAttemptContextImpl,
+ ParquetInputSplit, InternalRow, TaskAttemptContextImpl,
Option[FilterPredicate], Option[ZoneId],
LegacyBehaviorPolicy.Value,
LegacyBehaviorPolicy.Value) => RecordReader[Void, T]): RecordReader[Void, T] = {
val conf = broadcastedConf.value.value
val filePath = new Path(new URI(file.filePath))
- val split = new FileSplit(filePath, file.start, file.length, Array.empty[String])
+ val split =
+ new org.apache.parquet.hadoop.ParquetInputSplit(
+ filePath,
+ file.start,
+ file.start + file.length,
+ file.length,
+ Array.empty,
+ null)
lazy val footerFileMetaData =
ParquetFileReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData
@@ -193,7 +199,7 @@ case class ParquetPartitionReaderFactory(
}
private def createRowBaseParquetReader(
- split: FileSplit,
+ split: ParquetInputSplit,
partitionValues: InternalRow,
hadoopAttemptContext: TaskAttemptContextImpl,
pushed: Option[FilterPredicate],
@@ -228,7 +234,7 @@ case class ParquetPartitionReaderFactory(
}
private def createParquetVectorizedReader(
- split: FileSplit,
+ split: ParquetInputSplit,
partitionValues: InternalRow,
hadoopAttemptContext: TaskAttemptContextImpl,
pushed: Option[FilterPredicate],
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org