You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hu...@apache.org on 2021/10/29 21:52:24 UTC
[spark] branch master updated: [SPARK-37117][SQL] Fix reading
encrypted parquet files with external key material
This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6fd93b3 [SPARK-37117][SQL] Fix reading encrypted parquet files with external key material
6fd93b3 is described below
commit 6fd93b3de7459bd23067c1674fdc73d893d86a65
Author: Gidon Gershinsky <gg...@apple.com>
AuthorDate: Fri Oct 29 14:41:24 2021 -0700
[SPARK-37117][SQL] Fix reading encrypted parquet files with external key material
### What changes were proposed in this pull request?
Bug fix
### Why are the changes needed?
Parquet encryption has a number of modes. One of them is "external key material", which keeps encrypted data keys in a separate file (as opposed to inside the Parquet file). Upon reading, the Spark Parquet connector does not pass the file path, which causes an NPE.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unitest was added
Closes #34415 from ggershinsky/fix-320-npe-ext-kmat-pme.
Authored-by: Gidon Gershinsky <gg...@apple.com>
Signed-off-by: Huaxin Gao <hu...@apple.com>
---
.../datasources/parquet/ParquetFooterReader.java | 3 ++-
.../parquet/SpecificParquetRecordReaderBase.java | 4 ++--
.../spark/sql/hive/ParquetEncryptionSuite.scala | 27 ++++++++++++++++++++++
3 files changed, 31 insertions(+), 3 deletions(-)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java
index ab07b19..ea4c89e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java
@@ -47,7 +47,8 @@ public class ParquetFooterReader {
private static ParquetMetadata readFooter(HadoopInputFile inputFile,
ParquetMetadataConverter.MetadataFilter filter) throws IOException {
ParquetReadOptions readOptions =
- HadoopReadOptions.builder(inputFile.getConfiguration()).withMetadataFilter(filter).build();
+ HadoopReadOptions.builder(inputFile.getConfiguration(), inputFile.getPath())
+ .withMetadataFilter(filter).build();
// Use try-with-resources to ensure fd is closed.
try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, readOptions)) {
return fileReader.getFooter();
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index f5ae6cf..ccfe379 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -88,7 +88,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
this.file = split.getPath();
ParquetReadOptions options = HadoopReadOptions
- .builder(configuration)
+ .builder(configuration, file)
.withRange(split.getStart(), split.getStart() + split.getLength())
.build();
ParquetFileReader fileReader = new ParquetFileReader(
@@ -157,7 +157,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
ParquetReadOptions options = HadoopReadOptions
- .builder(config)
+ .builder(config, file)
.withRange(0, length)
.build();
ParquetFileReader fileReader = ParquetFileReader.open(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala
index 184ccad..24107f0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala
@@ -64,6 +64,33 @@ class ParquetEncryptionSuite extends QueryTest with TestHiveSingleton {
}
}
+ test("SPARK-37117: Can't read files in Parquet encryption external key material mode") {
+ withTempDir { dir =>
+ withSQLConf(
+ "parquet.crypto.factory.class" ->
+ "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory",
+ "parquet.encryption.kms.client.class" ->
+ "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
+ "parquet.encryption.key.material.store.internally" ->
+ "false",
+ "parquet.encryption.key.list" ->
+ s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
+
+ val inputDF = Seq((1, 22, 333)).toDF("a", "b", "c")
+ val parquetDir = new File(dir, "parquet").getCanonicalPath
+ inputDF.write
+ .option("parquet.encryption.column.keys", "key1: a, b; key2: c")
+ .option("parquet.encryption.footer.key", "footerKey")
+ .parquet(parquetDir)
+
+ val parquetDF = spark.read.parquet(parquetDir)
+ assert(parquetDF.inputFiles.nonEmpty)
+ val readDataset = parquetDF.select("a", "b", "c")
+ checkAnswer(readDataset, inputDF)
+ }
+ }
+ }
+
/**
* Verify that the directory contains an encrypted parquet in
* encrypted footer mode by means of checking for all the parquet part files
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org