You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/22 18:37:00 UTC

[jira] [Commented] (PARQUET-1228) parquet-format code changes for encryption support

    [ https://issues.apache.org/jira/browse/PARQUET-1228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16659330#comment-16659330 ] 

ASF GitHub Bot commented on PARQUET-1228:
-----------------------------------------

ggershinsky closed pull request #95: PARQUET-1228: parquet-format code changes for encryption support
URL: https://github.com/apache/parquet-format/pull/95
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/src/main/java/org/apache/parquet/format/BlockCipher.java b/src/main/java/org/apache/parquet/format/BlockCipher.java
new file mode 100755
index 00000000..6a18aced
--- /dev/null
+++ b/src/main/java/org/apache/parquet/format/BlockCipher.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.format;
+
+import java.io.IOException;
+
+public interface BlockCipher{
+
+  public interface Encryptor{
+    /**
+     * Encrypts the plaintext.
+     * Make sure the returned contents starts at offset 0 and fills up the byte array.
+     * Input plaintext starts at offset 0, and has a length of plaintext.length.
+     * @param plaintext
+     * @return ciphertext
+     * @throws IOException
+     */
+    public byte[] encrypt(byte[] plaintext) throws IOException;
+
+    /**
+     * Encrypts the plaintext.
+     * Make sure the returned contents starts at offset 0 and fills up the byte array.
+     * Input plaintext starts at offset, and has a length of len.
+     * @param plaintext
+     * @param offset
+     * @param len
+     * @return ciphertext
+     * @throws IOException
+     */
+    public byte[] encrypt(byte[] plaintext, int offset, int len) throws IOException;
+  }
+
+
+  public interface Decryptor{  
+    /**
+     * Decrypts the ciphertext. 
+     * Make sure the returned plaintext starts at offset 0 and and fills up the byte array.
+     * Input ciphertext starts at offset 0, and has a length of ciphertext.length.
+     * @param ciphertext
+     * @return plaintext
+     * @throws IOException
+     */
+    public byte[] decrypt(byte[] ciphertext) throws IOException;
+
+    /**
+     * Decrypts the ciphertext. 
+     * Make sure the returned plaintext starts at offset 0 and and fills up the byte array.
+     * Input ciphertext starts at offset, and has a length of len.
+     * @param ciphertext
+     * @param offset
+     * @param len
+     * @return plaintext
+     * @throws IOException
+     */
+    public byte[] decrypt(byte[] ciphertext, int offset, int len) throws IOException;
+  }
+}
+
+
diff --git a/src/main/java/org/apache/parquet/format/Util.java b/src/main/java/org/apache/parquet/format/Util.java
index 55d61ff4..5819da41 100644
--- a/src/main/java/org/apache/parquet/format/Util.java
+++ b/src/main/java/org/apache/parquet/format/Util.java
@@ -30,6 +30,7 @@
 import static org.apache.parquet.format.event.Consumers.listOf;
 import static org.apache.parquet.format.event.Consumers.struct;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -40,7 +41,7 @@
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TIOStreamTransport;
-
+import org.apache.thrift.transport.TMemoryBuffer;
 import org.apache.parquet.format.event.Consumers.Consumer;
 import org.apache.parquet.format.event.Consumers.DelegatingFieldConsumer;
 import org.apache.parquet.format.event.EventBasedThriftReader;
@@ -58,36 +59,78 @@
 public class Util {
 
   public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream to) throws IOException {
-    write(columnIndex, to);
+    writeColumnIndex(columnIndex, to, (BlockCipher.Encryptor) null);
+  }
+  
+  public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream to, BlockCipher.Encryptor encryptor) throws IOException {
+    write(columnIndex, to, encryptor);
   }
 
   public static ColumnIndex readColumnIndex(InputStream from) throws IOException {
-    return read(from, new ColumnIndex());
+    return readColumnIndex(from, (BlockCipher.Decryptor) null);
+  }
+  
+  public static ColumnIndex readColumnIndex(InputStream from, BlockCipher.Decryptor decryptor) throws IOException {
+    return read(from, new ColumnIndex(), decryptor);
   }
 
   public static void writeOffsetIndex(OffsetIndex offsetIndex, OutputStream to) throws IOException {
-    write(offsetIndex, to);
+    writeOffsetIndex(offsetIndex, to, (BlockCipher.Encryptor) null);
+  }
+  
+  public static void writeOffsetIndex(OffsetIndex offsetIndex, OutputStream to, BlockCipher.Encryptor encryptor) throws IOException {
+    write(offsetIndex, to, encryptor);
   }
 
   public static OffsetIndex readOffsetIndex(InputStream from) throws IOException {
-    return read(from, new OffsetIndex());
+    return readOffsetIndex(from, (BlockCipher.Decryptor) null);
+  }
+  
+  public static OffsetIndex readOffsetIndex(InputStream from, BlockCipher.Decryptor decryptor) throws IOException {
+    return read(from, new OffsetIndex(), decryptor);
   }
 
   public static void writePageHeader(PageHeader pageHeader, OutputStream to) throws IOException {
-    write(pageHeader, to);
+    writePageHeader(pageHeader, to, (BlockCipher.Encryptor) null);
+  }
+  
+  public static void writePageHeader(PageHeader pageHeader, OutputStream to, BlockCipher.Encryptor encryptor) throws IOException {
+    write(pageHeader, to, encryptor);
   }
 
   public static PageHeader readPageHeader(InputStream from) throws IOException {
-    return read(from, new PageHeader());
+    return readPageHeader(from, (BlockCipher.Decryptor) null);
+  }
+  
+  public static PageHeader readPageHeader(InputStream from, BlockCipher.Decryptor decryptor) throws IOException {
+    return read(from, new PageHeader(), decryptor);
   }
 
   public static void writeFileMetaData(org.apache.parquet.format.FileMetaData fileMetadata, OutputStream to) throws IOException {
-    write(fileMetadata, to);
+    writeFileMetaData(fileMetadata, to, (BlockCipher.Encryptor) null);
+  }
+  
+  public static void writeFileMetaData(org.apache.parquet.format.FileMetaData fileMetadata, OutputStream to,
+      BlockCipher.Encryptor encryptor) throws IOException {
+    write(fileMetadata, to, encryptor);
   }
 
   public static FileMetaData readFileMetaData(InputStream from) throws IOException {
-    return read(from, new FileMetaData());
+    return readFileMetaData(from, (BlockCipher.Decryptor) null);
+  }
+  
+  public static FileMetaData readFileMetaData(InputStream from, BlockCipher.Decryptor decryptor) throws IOException {
+    return read(from, new FileMetaData(), decryptor);
+  }
+  
+  public static void writeColumnMetaData(ColumnMetaData columnMetaData, OutputStream to, BlockCipher.Encryptor encryptor) throws IOException {
+    write(columnMetaData, to, encryptor);
+  }
+  
+  public static ColumnMetaData readColumnMetaData(InputStream from, BlockCipher.Decryptor decryptor) throws IOException {
+    return read(from, new ColumnMetaData(), decryptor);
   }
+  
   /**
    * reads the meta data from the stream
    * @param from the stream to read the metadata from
@@ -96,14 +139,25 @@ public static FileMetaData readFileMetaData(InputStream from) throws IOException
    * @throws IOException
    */
   public static FileMetaData readFileMetaData(InputStream from, boolean skipRowGroups) throws IOException {
+    return readFileMetaData(from, skipRowGroups, (BlockCipher.Decryptor) null);
+  }
+  public static FileMetaData readFileMetaData(InputStream from, boolean skipRowGroups, BlockCipher.Decryptor decryptor) throws IOException {
     FileMetaData md = new FileMetaData();
     if (skipRowGroups) {
-      readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups);
+      readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups, decryptor);
     } else {
-      read(from, md);
+      read(from, md, decryptor);
     }
     return md;
   }
+  
+  public static void writeFileCryptoMetaData(org.apache.parquet.format.FileCryptoMetaData cryptoMetadata, OutputStream to) throws IOException { 
+    write(cryptoMetadata, to, (BlockCipher.Encryptor) null);
+  }
+  
+  public static FileCryptoMetaData readFileCryptoMetaData(InputStream from) throws IOException {
+    return read(from, new FileCryptoMetaData(), (BlockCipher.Decryptor) null);
+  }
 
   /**
    * To read metadata in a streaming fashion.
@@ -165,10 +219,18 @@ public void addKeyValueMetaData(KeyValue kv) {
   }
 
   public static void readFileMetaData(InputStream from, FileMetaDataConsumer consumer) throws IOException {
-    readFileMetaData(from, consumer, false);
+    readFileMetaData(from, consumer, (BlockCipher.Decryptor) null);
   }
-
+  public static void readFileMetaData(InputStream from, FileMetaDataConsumer consumer, BlockCipher.Decryptor decryptor) throws IOException {
+    readFileMetaData(from, consumer, false, decryptor);
+  }
+  
   public static void readFileMetaData(InputStream from, final FileMetaDataConsumer consumer, boolean skipRowGroups) throws IOException {
+    readFileMetaData(from, consumer, skipRowGroups, (BlockCipher.Decryptor) null);
+  }
+
+  public static void readFileMetaData(InputStream input, final FileMetaDataConsumer consumer, 
+      boolean skipRowGroups, BlockCipher.Decryptor decryptor) throws IOException {
     try {
       DelegatingFieldConsumer eventConsumer = fieldConsumer()
       .onField(VERSION, new I32Consumer() {
@@ -205,8 +267,9 @@ public void consume(RowGroup rowGroup) {
           }
         })));
       }
+      
+      InputStream from = (null == decryptor? input : decryptStream(input, decryptor));
       new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
-
     } catch (TException e) {
       throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
     }
@@ -223,8 +286,41 @@ private static TProtocol protocol(InputStream from) {
   private static InterningProtocol protocol(TIOStreamTransport t) {
     return new InterningProtocol(new TCompactProtocol(t));
   }
+  
+  private static InputStream decryptStream(InputStream from, BlockCipher.Decryptor decryptor) throws IOException {    
+    byte[] i32rd = new byte[4];
+    int got = 0;
+    // Read the length of encrypted Thrift structure
+    while (got < 4) {
+      int n = from.read(i32rd, got, 4-got);
+      if (n <= 0) {
+        throw new IOException("Tried to read int (4 bytes), but only got " + got + " bytes.");
+      }
+      got += n;
+    }
+    int clen =
+        ((i32rd[3] & 0xff) << 24) |
+        ((i32rd[2] & 0xff) << 16) |
+        ((i32rd[1] & 0xff) <<  8) |
+        ((i32rd[0] & 0xff));
+    if (clen < 1) throw new IOException("Wrong length of encrypted metadata: " + clen);
+    byte[] cbuf = new byte[clen];
+    got = 0;
+    // Read the encrypted structure contents
+    while (got < clen) {
+      int n = from.read(cbuf, got, clen - got);
+      if (n <= 0) {
+        throw new IOException("Tried to read " + clen + " bytes, but only got " + got + " bytes.");
+      }
+      got += n;
+    }
+    // Decrypt the structure contents
+    byte[] pbuf = decryptor.decrypt(cbuf, 0, clen);
+    return new ByteArrayInputStream(pbuf);
+  }
 
-  private static <T extends TBase<?,?>> T read(InputStream from, T tbase) throws IOException {
+  private static <T extends TBase<?,?>> T read(InputStream input, T tbase, BlockCipher.Decryptor decryptor) throws IOException {
+    InputStream from = (null == decryptor? input : decryptStream(input, decryptor));
     try {
       tbase.read(protocol(from));
       return tbase;
@@ -233,11 +329,36 @@ private static InterningProtocol protocol(TIOStreamTransport t) {
     }
   }
 
-  private static void write(TBase<?, ?> tbase, OutputStream to) throws IOException {
+  private static void write(TBase<?, ?> tbase, OutputStream to, BlockCipher.Encryptor encryptor) throws IOException {
+    if (null == encryptor) { 
+      try {
+        tbase.write(protocol(to));
+        return;
+      } catch (TException e) {
+        throw new IOException("can not write " + tbase, e);
+      }
+    }
+    // Serialize and encrypt the structure
+    TMemoryBuffer tmb = new TMemoryBuffer(100);
     try {
-      tbase.write(protocol(to));
+      tbase.write(new InterningProtocol(new TCompactProtocol(tmb)));
     } catch (TException e) {
       throw new IOException("can not write " + tbase, e);
     }
+    byte[] pbuf = tmb.getArray();
+    int plen = tmb.length();
+    byte[] cbuf = encryptor.encrypt(pbuf, 0, plen);
+    int clen = cbuf.length;
+    // Serialize the length of encrypted structure
+    byte[] i32out = new byte[4];
+    i32out[3] = (byte)(0xff & (clen >> 24));
+    i32out[2] = (byte)(0xff & (clen >> 16));
+    i32out[1] = (byte)(0xff & (clen >> 8));
+    i32out[0] = (byte)(0xff & (clen));
+    // Write and flush
+    to.write(i32out, 0, 4);
+    to.write(cbuf, 0, clen);
+    to.flush();
   }
 }
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> parquet-format code changes for encryption support
> --------------------------------------------------
>
>                 Key: PARQUET-1228
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1228
>             Project: Parquet
>          Issue Type: Sub-task
>          Components: parquet-format
>            Reporter: Gidon Gershinsky
>            Assignee: Gidon Gershinsky
>            Priority: Major
>              Labels: pull-request-available
>
> # Write/read the new Thrift structures (crypto metada)
>  # Add encryption/decryption of Parquet metadata Thrift structures



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)