You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/04/24 21:29:14 UTC
[spark] branch master updated: [SPARK-34990][SQL][TESTS] Add
ParquetEncryptionSuite
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 166cc62 [SPARK-34990][SQL][TESTS] Add ParquetEncryptionSuite
166cc62 is described below
commit 166cc6204c96665e7b568cfcc8ba243e79dbf837
Author: Maya Anderson <ma...@il.ibm.com>
AuthorDate: Sat Apr 24 14:28:00 2021 -0700
[SPARK-34990][SQL][TESTS] Add ParquetEncryptionSuite
### What changes were proposed in this pull request?
A simple test that writes and reads an encrypted parquet and verifies that it's encrypted by checking its magic string (in encrypted footer mode).
### Why are the changes needed?
To provide a test coverage for Parquet encryption.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- [x] [SBT / Hadoop 3.2 / Java8 (the default)](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137785/testReport)
- [ ] ~SBT / Hadoop 3.2 / Java11 by adding [test-java11] to the PR title.~ (Jenkins Java11 build is broken due to missing JDK11 installation)
- [x] [SBT / Hadoop 2.7 / Java8 by adding [test-hadoop2.7] to the PR title.](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/137836/testReport)
- [x] Maven / Hadoop 3.2 / Java8 by adding [test-maven] to the PR title.
- [x] Maven / Hadoop 2.7 / Java8 by adding [test-maven][test-hadoop2.7] to the PR title.
Closes #32146 from andersonm-ibm/pme_testing.
Authored-by: Maya Anderson <ma...@il.ibm.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
sql/hive/pom.xml | 7 ++
.../spark/sql/hive/ParquetEncryptionSuite.scala | 96 ++++++++++++++++++++++
2 files changed, 103 insertions(+)
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 4108d0f..729d3f4 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -72,6 +72,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${parquet.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<!--
<dependency>
<groupId>com.google.guava</groupId>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala
new file mode 100644
index 0000000..184ccad
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetEncryptionSuite.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.io.RandomAccessFile
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+/**
+ * A test suite that tests parquet modular encryption usage.
+ */
+class ParquetEncryptionSuite extends QueryTest with TestHiveSingleton {
+ import spark.implicits._
+
+ private val encoder = Base64.getEncoder
+ private val footerKey =
+ encoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8))
+ private val key1 = encoder.encodeToString("1234567890123450".getBytes(StandardCharsets.UTF_8))
+ private val key2 = encoder.encodeToString("1234567890123451".getBytes(StandardCharsets.UTF_8))
+
+ test("SPARK-34990: Write and read an encrypted parquet") {
+ withTempDir { dir =>
+ withSQLConf(
+ "parquet.crypto.factory.class" ->
+ "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory",
+ "parquet.encryption.kms.client.class" ->
+ "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
+ "parquet.encryption.key.list" ->
+ s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
+
+ val inputDF = Seq((1, 22, 333)).toDF("a", "b", "c")
+ val parquetDir = new File(dir, "parquet").getCanonicalPath
+ inputDF.write
+ .option("parquet.encryption.column.keys", "key1: a, b; key2: c")
+ .option("parquet.encryption.footer.key", "footerKey")
+ .parquet(parquetDir)
+
+ verifyParquetEncrypted(parquetDir)
+
+ val parquetDF = spark.read.parquet(parquetDir)
+ assert(parquetDF.inputFiles.nonEmpty)
+ val readDataset = parquetDF.select("a", "b", "c")
+ checkAnswer(readDataset, inputDF)
+ }
+ }
+ }
+
+ /**
+ * Verify that the directory contains an encrypted parquet in
+ * encrypted footer mode by means of checking for all the parquet part files
+ * in the parquet directory that their magic string is PARE, as defined in the spec:
+ * https://github.com/apache/parquet-format/blob/master/Encryption.md#54-encrypted-footer-mode
+ */
+ private def verifyParquetEncrypted(parquetDir: String): Unit = {
+ val parquetPartitionFiles = getListOfParquetFiles(new File(parquetDir))
+ assert(parquetPartitionFiles.size >= 1)
+ parquetPartitionFiles.foreach { parquetFile =>
+ val magicString = "PARE"
+ val magicStringLength = magicString.length()
+ val byteArray = new Array[Byte](magicStringLength)
+ val randomAccessFile = new RandomAccessFile(parquetFile, "r")
+ try {
+ randomAccessFile.read(byteArray, 0, magicStringLength)
+ } finally {
+ randomAccessFile.close()
+ }
+ val stringRead = new String(byteArray, StandardCharsets.UTF_8)
+ assert(magicString == stringRead)
+ }
+ }
+
+ private def getListOfParquetFiles(dir: File): List[File] = {
+ dir.listFiles.filter(_.isFile).toList.filter { file =>
+ file.getName.endsWith("parquet")
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org