You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hu...@apache.org on 2021/10/29 21:52:24 UTC

[spark] branch master updated: [SPARK-37117][SQL] Fix reading encrypted parquet files with external key material

This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fd93b3  [SPARK-37117][SQL] Fix reading encrypted parquet files with external key material
6fd93b3 is described below

commit 6fd93b3de7459bd23067c1674fdc73d893d86a65
Author: Gidon Gershinsky <gg...@apple.com>
AuthorDate: Fri Oct 29 14:41:24 2021 -0700

    [SPARK-37117][SQL] Fix reading encrypted parquet files with external key material
    
    ### What changes were proposed in this pull request?
    Bug fix
    
    ### Why are the changes needed?
    Parquet encryption has a number of modes. One of them is "external key material", which keeps encrypted data keys in a separate file (as opposed to inside the Parquet file). Upon reading, the Spark Parquet connector does not pass the file path, which causes an NPE.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unitest was added
    
    Closes #34415 from ggershinsky/fix-320-npe-ext-kmat-pme.
    
    Authored-by: Gidon Gershinsky <gg...@apple.com>
    Signed-off-by: Huaxin Gao <hu...@apple.com>
---
 .../datasources/parquet/ParquetFooterReader.java   |  3 ++-
 .../parquet/SpecificParquetRecordReaderBase.java   |  4 ++--
 .../spark/sql/hive/ParquetEncryptionSuite.scala    | 27 ++++++++++++++++++++++
 3 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java
index ab07b19..ea4c89e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java
@@ -47,7 +47,8 @@ public class ParquetFooterReader {
   private static ParquetMetadata readFooter(HadoopInputFile inputFile,
       ParquetMetadataConverter.MetadataFilter filter) throws IOException {
     ParquetReadOptions readOptions =
-      HadoopReadOptions.builder(inputFile.getConfiguration()).withMetadataFilter(filter).build();
+      HadoopReadOptions.builder(inputFile.getConfiguration(), inputFile.getPath())
+        .withMetadataFilter(filter).build();
     // Use try-with-resources to ensure fd is closed.
     try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, readOptions)) {
       return fileReader.getFooter();
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index f5ae6cf..ccfe379 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -88,7 +88,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     this.file = split.getPath();
 
     ParquetReadOptions options = HadoopReadOptions
-      .builder(configuration)
+      .builder(configuration, file)
       .withRange(split.getStart(), split.getStart() + split.getLength())
       .build();
     ParquetFileReader fileReader = new ParquetFileReader(
@@ -157,7 +157,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
 
     ParquetReadOptions options = HadoopReadOptions
-      .builder(config)
+      .builder(config, file)
       .withRange(0, length)
       .build();
     ParquetFileReader fileReader = ParquetFileReader.open(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala
index 184ccad..24107f0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala
@@ -64,6 +64,33 @@ class ParquetEncryptionSuite extends QueryTest with TestHiveSingleton {
     }
   }
 
+  test("SPARK-37117: Can't read files in Parquet encryption external key material mode") {
+    withTempDir { dir =>
+      withSQLConf(
+        "parquet.crypto.factory.class" ->
+          "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory",
+        "parquet.encryption.kms.client.class" ->
+          "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
+        "parquet.encryption.key.material.store.internally" ->
+          "false",
+        "parquet.encryption.key.list" ->
+          s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
+
+        val inputDF = Seq((1, 22, 333)).toDF("a", "b", "c")
+        val parquetDir = new File(dir, "parquet").getCanonicalPath
+        inputDF.write
+          .option("parquet.encryption.column.keys", "key1: a, b; key2: c")
+          .option("parquet.encryption.footer.key", "footerKey")
+          .parquet(parquetDir)
+
+        val parquetDF = spark.read.parquet(parquetDir)
+        assert(parquetDF.inputFiles.nonEmpty)
+        val readDataset = parquetDF.select("a", "b", "c")
+        checkAnswer(readDataset, inputDF)
+      }
+    }
+  }
+
   /**
    * Verify that the directory contains an encrypted parquet in
    * encrypted footer mode by means of checking for all the parquet part files

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org