You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2021/09/12 18:35:10 UTC

[orc] branch branch-1.6 updated: ORC-991: Fix the bug of encrypted column read crash (#905)

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new 1b98095  ORC-991: Fix the bug of encrypted column read crash (#905)
1b98095 is described below

commit 1b98095953fd0da0edfe4509e7140d6acdd82cfc
Author: Guiyanakaung <gu...@gmail.com>
AuthorDate: Sun Sep 12 11:34:49 2021 -0700

    ORC-991: Fix the bug of encrypted column read crash (#905)
    
    1. RowIndex is never marked as encrypted.  The StreamName constructor adds encryption to make rowIndex encrypted when needed.
    
    TreeWriterBase.java
    ```java
      public void writeStripe(int requiredIndexEntries) throws IOException {
          .....
    -    context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
    +    context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX, encryption), rowIndex);
          .....
      }
    ```
    
    2.  findStreams in StripePlanner.java total offset order and write inconsistency.
    finalizeStripe In PhysicalFsWriter.java
    > 1. write the unencrypted index streams
    > 2. write the encrypted index streams
    > 3. write the unencrypted data streams
    > 4. write the encrypted data streams
    ```java
      @Override
      public void finalizeStripe(OrcProto.StripeFooter.Builder footerBuilder,
                                 OrcProto.StripeInformation.Builder dirEntry
                                 ) throws IOException {
        .....
        // write the unencrypted index streams
        unencrypted.writeStreams(StreamName.Area.INDEX, rawWriter);
        // write the encrypted index streams
        for (VariantTracker variant: variants.values()) {
          variant.writeStreams(StreamName.Area.INDEX, rawWriter);
        }
    
        // write the unencrypted data streams
        unencrypted.writeStreams(StreamName.Area.DATA, rawWriter);
        // write out the encrypted data streams
        for (VariantTracker variant: variants.values()) {
          variant.writeStreams(StreamName.Area.DATA, rawWriter);
        }
        .....
      }
    ```
    
    findStreams in StripePlanner.java
    > 1. total offset the unencrypted index/data streams
    > 2. total offset encrypted index/data streams
    
    ```java
      private void findStreams(long streamStart,
                               OrcProto.StripeFooter footer,
                               boolean[] columnInclude) throws IOException {
        long currentOffset = streamStart;
        Arrays.fill(bloomFilterKinds, null);
        for(OrcProto.Stream stream: footer.getStreamsList()) {
          currentOffset += handleStream(currentOffset, columnInclude, stream, null);
        }
    
        // Add the encrypted streams that we are using
        for(ReaderEncryptionVariant variant: encryption.getVariants()) {
          int variantId = variant.getVariantId();
          OrcProto.StripeEncryptionVariant stripeVariant =
              footer.getEncryption(variantId);
          for(OrcProto.Stream stream: stripeVariant.getStreamsList()) {
            currentOffset += handleStream(currentOffset, columnInclude, stream, variant);
          }
        }
      }
    ```
    
    Causes misalignment of data reading.
    This pr ensures that the read offset is consistent with the write.
    
    3. Fix Decimal64TreeWriter stream not binding encryption.
    
    Fix the bug of encrypted column read crash.
    
    Added unit test for reading encrypted columns.
    
    (cherry picked from commit 792c3f820d0b7a64b27c9dc4c390443386e6baf0)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit a3de6daa7840e89470f83cd30aeffc9631e17e63)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../org/apache/orc/impl/reader/StripePlanner.java  |  34 ++++-
 .../orc/impl/writer/Decimal64TreeWriter.java       |   2 +-
 .../org/apache/orc/impl/writer/TreeWriterBase.java |   2 +-
 .../test/org/apache/orc/impl/TestEncryption.java   | 143 +++++++++++++++++++++
 4 files changed, 175 insertions(+), 6 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java b/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
index 463a70c..221e81e 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
@@ -30,6 +30,7 @@ import org.apache.orc.impl.BufferChunkList;
 import org.apache.orc.impl.CryptoUtils;
 import org.apache.orc.impl.InStream;
 import org.apache.orc.impl.OrcIndex;
+import org.apache.orc.impl.PhysicalFsWriter;
 import org.apache.orc.impl.RecordReaderUtils;
 import org.apache.orc.impl.StreamName;
 import org.jetbrains.annotations.NotNull;
@@ -236,18 +237,20 @@ public class StripePlanner {
    * @param offset the position in the file for this stream
    * @param columnInclude which columns are being read
    * @param stream the stream to consider
+   * @param area only the area will be included
    * @param variant the variant being read
    * @return the offset for the next stream
    */
   private long handleStream(long offset,
                             boolean[] columnInclude,
                             OrcProto.Stream stream,
+                            StreamName.Area area,
                             ReaderEncryptionVariant variant) {
     int column = stream.getColumn();
     if (stream.hasKind()) {
       OrcProto.Stream.Kind kind = stream.getKind();
 
-      if (kind == OrcProto.Stream.Kind.ENCRYPTED_INDEX ||
+      if (StreamName.getArea(kind) != area || kind == OrcProto.Stream.Kind.ENCRYPTED_INDEX ||
               kind == OrcProto.Stream.Kind.ENCRYPTED_DATA) {
         // Ignore the placeholders that shouldn't count toward moving the
         // offsets.
@@ -286,6 +289,8 @@ public class StripePlanner {
 
   /**
    * Find the complete list of streams.
+   * CurrentOffset total order must be consistent with write
+   * {@link PhysicalFsWriter#finalizeStripe}
    * @param streamStart the starting offset of streams in the file
    * @param footer the footer for the stripe
    * @param columnInclude which columns are being read
@@ -295,19 +300,40 @@ public class StripePlanner {
                            boolean[] columnInclude) throws IOException {
     long currentOffset = streamStart;
     Arrays.fill(bloomFilterKinds, null);
+    // +-----------------+---------------+-----------------+---------------+
+    // |                 |               |                 |               |
+    // |   unencrypted   |   encrypted   |   unencrypted   |   encrypted   |
+    // |      index      |     index     |      data       |     data      |
+    // |                 |               |                 |               |
+    // +-----------------+---------------+-----------------+---------------+
+    // Storage layout of index and data, So we need to find the streams in this order
+    //
+    // find index streams, encrypted first and then unencrypted
+    currentOffset = findStreamsByArea(currentOffset, footer, StreamName.Area.INDEX, columnInclude);
+
+    // find data streams, encrypted first and then unencrypted
+    findStreamsByArea(currentOffset, footer, StreamName.Area.DATA, columnInclude);
+  }
+
+  private long findStreamsByArea(long currentOffset,
+                                OrcProto.StripeFooter footer,
+                                StreamName.Area area,
+                                boolean[] columnInclude) {
+    // find unencrypted streams
     for(OrcProto.Stream stream: footer.getStreamsList()) {
-      currentOffset += handleStream(currentOffset, columnInclude, stream, null);
+      currentOffset += handleStream(currentOffset, columnInclude, stream, area, null);
     }
 
-    // Add the encrypted streams that we are using
+    // find encrypted streams
     for(ReaderEncryptionVariant variant: encryption.getVariants()) {
       int variantId = variant.getVariantId();
       OrcProto.StripeEncryptionVariant stripeVariant =
           footer.getEncryption(variantId);
       for(OrcProto.Stream stream: stripeVariant.getStreamsList()) {
-        currentOffset += handleStream(currentOffset, columnInclude, stream, variant);
+        currentOffset += handleStream(currentOffset, columnInclude, stream, area, variant);
       }
     }
+    return currentOffset;
   }
 
   /**
diff --git a/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
index d06f3a5..515fbf9 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
@@ -45,7 +45,7 @@ public class Decimal64TreeWriter extends TreeWriterBase {
                              WriterContext writer) throws IOException {
     super(schema, encryption, writer);
     OutStream stream = writer.createStream(
-        new StreamName(id, OrcProto.Stream.Kind.DATA));
+        new StreamName(id, OrcProto.Stream.Kind.DATA, encryption));
     // Use RLEv2 until we have the new RLEv3.
     valueWriter = new RunLengthIntegerWriterV2(stream, true, true);
     scale = schema.getScale();
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
index ba41837..0228246 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
@@ -280,7 +280,7 @@ public abstract class TreeWriterBase implements TreeWriter {
             "index entries found: " + rowIndex.getEntryCount() + " expected: " +
             requiredIndexEntries);
       }
-      context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
+      context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX, encryption), rowIndex);
       rowIndex.clear();
       rowIndexEntry.clear();
     }
diff --git a/java/core/src/test/org/apache/orc/impl/TestEncryption.java b/java/core/src/test/org/apache/orc/impl/TestEncryption.java
new file mode 100644
index 0000000..821547e
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestEncryption.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.InMemoryKeystore;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestEncryption {
+
+  Path workDir = new Path(System.getProperty("test.tmp.dir"));
+  Configuration conf;
+  FileSystem fs;
+  Path testFilePath;
+  TypeDescription schema;
+  KeyProvider keyProvider;
+  String encryption;
+  String mask;
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), VectorizedRowBatch.DEFAULT_SIZE);
+    fs = FileSystem.getLocal(conf);
+    fs.setWorkingDirectory(workDir);
+    testFilePath = new Path("testWriterImpl.orc");
+    fs.create(testFilePath, true);
+    schema = TypeDescription.fromString("struct<id:int,name:string>");
+    byte[] kmsKey = "secret123".getBytes(StandardCharsets.UTF_8);
+    keyProvider = new InMemoryKeystore()
+        .addKey("pii", EncryptionAlgorithm.AES_CTR_128, kmsKey);
+    encryption = "pii:id,name";
+    mask = "sha256:id,name";
+  }
+
+  @After
+  public void deleteTestFile() throws Exception {
+    fs.delete(testFilePath, false);
+  }
+
+  private void write() throws IOException {
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .overwrite(true)
+            .setKeyProvider(keyProvider)
+            .encrypt(encryption)
+            .masks(mask));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    LongColumnVector id = (LongColumnVector) batch.cols[0];
+    BytesColumnVector name = (BytesColumnVector) batch.cols[1];
+    for (int r = 0; r < VectorizedRowBatch.DEFAULT_SIZE * 2; ++r) {
+      int row = batch.size++;
+      id.vector[row] = r;
+      byte[] buffer = ("name-" + (r * 3)).getBytes(StandardCharsets.UTF_8);
+      name.setRef(row, buffer, 0, buffer.length);
+      if (batch.size == batch.getMaxSize()) {
+        writer.addRowBatch(batch);
+        batch.reset();
+      }
+    }
+    if (batch.size != 0) {
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+  }
+
+  private void read(boolean pushDown) throws IOException {
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).setKeyProvider(keyProvider));
+    SearchArgument searchArgument = pushDown ? SearchArgumentFactory.newBuilder()
+        .equals("id", PredicateLeaf.Type.LONG, (long) VectorizedRowBatch.DEFAULT_SIZE)
+        .build() : null;
+    VectorizedRowBatch batch = schema.createRowBatch();
+    Reader.Options options = reader.options().schema(this.schema);
+    if (pushDown) {
+      options = options.searchArgument(searchArgument, new String[]{"id"});
+    }
+    RecordReader rowIterator = reader.rows(options);
+    LongColumnVector idColumn = (LongColumnVector) batch.cols[0];
+    BytesColumnVector nameColumn = (BytesColumnVector) batch.cols[1];
+    int batchNum = pushDown ? 1 : 0;
+    while (rowIterator.nextBatch(batch)) {
+      for (int row = 0; row < batch.size; ++row) {
+        long value = row + ((long) batchNum * VectorizedRowBatch.DEFAULT_SIZE);
+        assertEquals(value, idColumn.vector[row]);
+        assertEquals("name-" + (value * 3), nameColumn.toString(row));
+      }
+      batchNum ++;
+    }
+    rowIterator.close();
+  }
+
+  @Test
+  public void testReadEncryption() throws IOException {
+    write();
+    read(false);
+  }
+
+  @Test
+  public void testPushDownReadEncryption() throws IOException {
+    write();
+    read(true);
+  }
+
+}