You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by GitBox <gi...@apache.org> on 2023/01/13 11:19:27 UTC

[GitHub] [parquet-mr] parthchandra commented on a diff in pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted

parthchandra commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1069182893


##########
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##########
@@ -51,17 +52,26 @@
      * @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher)
      * @return plaintext - starts at offset 0 of the output value, and fills up the entire byte array.
      */
-    public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
+    byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
 
     /**
+     * Convenience decryption method that reads the length and ciphertext from a ByteBuffer
+     *
+     * @param from ByteBuffer with length and ciphertext.
+     * @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher)
+     * @return plaintext -  starts at offset 0 of the output, and fills up the entire byte buffer.

Review Comment:
   Done



##########
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##########
@@ -36,11 +37,11 @@
      * The ciphertext includes the nonce and (in case of GCM cipher) the tag, as detailed in the 
      * Parquet Modular Encryption specification.
      */
-    public byte[] encrypt(byte[] plaintext, byte[] AAD);
+    byte[] encrypt(byte[] plaintext, byte[] AAD);

Review Comment:
   Probably. This pr only focusses on the read path though. I can add the encryptor api when I look at the write path. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLen
 
     return plainText;
   }
+  public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
+

Review Comment:
   Done



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java:
##########
@@ -144,6 +144,11 @@
    */
   public static final String BLOOM_FILTERING_ENABLED = "parquet.filter.bloom.enabled";
 
+  /**
+    * Key to configure if off-heap buffer should be used for decryption
+    */
+  public static final String OFF_HEAP_DECRYPT_BUFFER_ENABLED = "parquet.decrypt.off-heap.buffer.enabled";

Review Comment:
   Done



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,36 @@ public DataPage readPage() {
         public DataPage visit(DataPageV1 dataPageV1) {

Review Comment:
   Actually, it does. Added it for V2 and updated the unit tests as well. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java:
##########
@@ -242,6 +245,12 @@ public Builder<T> withFilter(Filter filter) {
       return this;
     }
 
+    public Builder<T> withAllocator(ByteBufferAllocator allocator) {

Review Comment:
   Well, yes. ParquetReader has an instance of `ParquetReadOptions` and this builder simply passes the allocator down to that instance. Adding this here is mirroring the other options being set in `ParquetReader.Builder`. Used in the unit tests.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##########
@@ -133,11 +135,36 @@ public DataPage readPage() {
         public DataPage visit(DataPageV1 dataPageV1) {
           try {
             BytesInput bytes = dataPageV1.getBytes();
-            if (null != blockDecryptor) {
-              bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+            BytesInput decompressed;
+
+            if (options.getAllocator().isDirect() && options.useOffHeapDecryptBuffer()) {
+              ByteBuffer byteBuffer = bytes.toByteBuffer();
+              if (!byteBuffer.isDirect()) {

Review Comment:
   I think there was a previous discussion on this and we simplified the code to throw an error. Basically, we expect that a user the ParquetReader will be using either a direct buffer or a heap buffer all the way thru, so that encountering both a direct buffer and a heap buffer is an error.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java:
##########
@@ -187,6 +189,7 @@ public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
     private final InputFile file;
     private final Path path;
     private Filter filter = null;
+    private ByteBufferAllocator  allocator = new HeapByteBufferAllocator();

Review Comment:
   Done



##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java:
##########
@@ -78,6 +79,38 @@ public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLen
     return plainText;
   }
 
+  public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {

Review Comment:
   Done



##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLen
 
     return plainText;
   }
+  public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
+
+    int cipherTextOffset = SIZE_LENGTH;
+    int cipherTextLength = ciphertext.limit() - ciphertext.position() - SIZE_LENGTH;
+
+    int plainTextLength = cipherTextLength - NONCE_LENGTH;
+    if (plainTextLength < 1) {
+      throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength);
+    }
+
+    // skip size
+    ciphertext.position(ciphertext.position() + cipherTextOffset);
+    // Get the nonce from ciphertext
+    ciphertext.get(ctrIV, 0, NONCE_LENGTH);
+
+    // Reuse the input buffer as the output buffer
+    ByteBuffer plainText = ciphertext.slice();
+    plainText.limit(plainTextLength);
+    int inputLength = cipherTextLength - NONCE_LENGTH;
+    int inputOffset = cipherTextOffset + NONCE_LENGTH;
+    try {
+      IvParameterSpec spec = new IvParameterSpec(ctrIV);
+      cipher.init(Cipher.DECRYPT_MODE, aesKey, spec);
+
+      // Breaking decryption into multiple updates, to trigger h/w acceleration in Java 9+
+      while (inputLength > CHUNK_LENGTH) {
+        ciphertext.position(inputOffset);
+        ciphertext.limit(inputOffset + CHUNK_LENGTH);
+        cipher.update(ciphertext, plainText);
+        inputOffset += CHUNK_LENGTH;
+        inputLength -= CHUNK_LENGTH;
+      }
+      ciphertext.position(inputOffset);
+      ciphertext.limit(inputOffset + inputLength);
+      cipher.doFinal(ciphertext, plainText);
+      plainText.flip();
+    } catch (GeneralSecurityException e) {
+      throw new ParquetCryptoRuntimeException("Failed to decrypt", e);

Review Comment:
   The original implementation in   `decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD)` has it the same way (without the trailing space). Leaving it as it is to keep it consistent. 
   If you want, we can change both. I would rather we terminate the message with a `.` (period).
   



##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##########
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLen
 
     return plainText;
   }
+  public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {

Review Comment:
   I've tried to, but the changes are rather intertwined and we'll end up with multiple functions abstracting out the common lines of code, with none of the functions seeming to be a complete unit by itself. I feel the current version is easier to read and maintain.



##########
parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java:
##########
@@ -202,7 +211,7 @@ public static Collection<Object[]> data() {
 
   private static final boolean plaintextFilesAllowed = true;
 
-  private static final int ROW_COUNT = 10000;
+  private static final int ROW_COUNT = 100;

Review Comment:
   TBH, I forget why. But on looking at it again, and rerunning the tests, I've _increased_ the value to test a part of the `AesCtrDecryptor` that was not getting tested before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org