You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2021/09/12 18:27:02 UTC

[orc] branch branch-1.7 updated: ORC-991: Fix the bug of encrypted column read crash (#905)

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new a3de6da  ORC-991: Fix the bug of encrypted column read crash (#905)
a3de6da is described below

commit a3de6daa7840e89470f83cd30aeffc9631e17e63
Author: Guiyanakaung <gu...@gmail.com>
AuthorDate: Mon Sep 13 02:26:15 2021 +0800

    ORC-991: Fix the bug of encrypted column read crash (#905)
    
    ### What changes were proposed in this pull request?
    
    1. RowIndex is never marked as encrypted.  The StreamName constructor adds encryption to make rowIndex encrypted when needed.
    
    TreeWriterBase.java
    ```java
      public void writeStripe(int requiredIndexEntries) throws IOException {
          .....
    -    context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
    +    context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX, encryption), rowIndex);
          .....
      }
    ```
    
    2.  findStreams in StripePlanner.java total offset order and write inconsistency.
    finalizeStripe In PhysicalFsWriter.java
    > 1. write the unencrypted index streams
    > 2. write the encrypted index streams
    > 3. write the unencrypted data streams
    > 4. write the encrypted data streams
    ```java
      @Override
      public void finalizeStripe(OrcProto.StripeFooter.Builder footerBuilder,
                                 OrcProto.StripeInformation.Builder dirEntry
                                 ) throws IOException {
        .....
        // write the unencrypted index streams
        unencrypted.writeStreams(StreamName.Area.INDEX, rawWriter);
        // write the encrypted index streams
        for (VariantTracker variant: variants.values()) {
          variant.writeStreams(StreamName.Area.INDEX, rawWriter);
        }
    
        // write the unencrypted data streams
        unencrypted.writeStreams(StreamName.Area.DATA, rawWriter);
        // write out the encrypted data streams
        for (VariantTracker variant: variants.values()) {
          variant.writeStreams(StreamName.Area.DATA, rawWriter);
        }
        .....
      }
    ```
    
    findStreams in StripePlanner.java
    > 1. total offset the unencrypted index/data streams
    > 2. total offset encrypted index/data streams
    
    ```java
      private void findStreams(long streamStart,
                               OrcProto.StripeFooter footer,
                               boolean[] columnInclude) throws IOException {
        long currentOffset = streamStart;
        Arrays.fill(bloomFilterKinds, null);
        for(OrcProto.Stream stream: footer.getStreamsList()) {
          currentOffset += handleStream(currentOffset, columnInclude, stream, null);
        }
    
        // Add the encrypted streams that we are using
        for(ReaderEncryptionVariant variant: encryption.getVariants()) {
          int variantId = variant.getVariantId();
          OrcProto.StripeEncryptionVariant stripeVariant =
              footer.getEncryption(variantId);
          for(OrcProto.Stream stream: stripeVariant.getStreamsList()) {
            currentOffset += handleStream(currentOffset, columnInclude, stream, variant);
          }
        }
      }
    ```
    
    Causes misalignment of data reading.
    This pr ensures that the read offset is consistent with the write.
    
    3. Fix Decimal64TreeWriter stream not binding encryption.
    
    ### Why are the changes needed?
    
    Fix the bug of encrypted column read crash.
    
    ### How was this patch tested?
    
    Added unit test for reading encrypted columns.
    
    (cherry picked from commit 792c3f820d0b7a64b27c9dc4c390443386e6baf0)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../org/apache/orc/impl/reader/StripePlanner.java  |  34 ++++-
 .../orc/impl/writer/Decimal64TreeWriter.java       |   2 +-
 .../org/apache/orc/impl/writer/TreeWriterBase.java |   2 +-
 .../test/org/apache/orc/impl/TestEncryption.java   | 143 +++++++++++++++++++++
 4 files changed, 175 insertions(+), 6 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java b/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
index a08d664..0636454 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
@@ -30,6 +30,7 @@ import org.apache.orc.impl.BufferChunkList;
 import org.apache.orc.impl.CryptoUtils;
 import org.apache.orc.impl.InStream;
 import org.apache.orc.impl.OrcIndex;
+import org.apache.orc.impl.PhysicalFsWriter;
 import org.apache.orc.impl.RecordReaderUtils;
 import org.apache.orc.impl.StreamName;
 import org.apache.orc.impl.reader.tree.TypeReader;
@@ -273,18 +274,20 @@ public class StripePlanner {
    * @param offset the position in the file for this stream
    * @param columnInclude which columns are being read
    * @param stream the stream to consider
+   * @param area only the area will be included
    * @param variant the variant being read
    * @return the offset for the next stream
    */
   private long handleStream(long offset,
                             boolean[] columnInclude,
                             OrcProto.Stream stream,
+                            StreamName.Area area,
                             ReaderEncryptionVariant variant) {
     int column = stream.getColumn();
     if (stream.hasKind()) {
       OrcProto.Stream.Kind kind = stream.getKind();
 
-      if (kind == OrcProto.Stream.Kind.ENCRYPTED_INDEX ||
+      if (StreamName.getArea(kind) != area || kind == OrcProto.Stream.Kind.ENCRYPTED_INDEX ||
               kind == OrcProto.Stream.Kind.ENCRYPTED_DATA) {
         // Ignore the placeholders that shouldn't count toward moving the
         // offsets.
@@ -323,6 +326,8 @@ public class StripePlanner {
 
   /**
    * Find the complete list of streams.
+   * CurrentOffset total order must be consistent with write
+   * {@link PhysicalFsWriter#finalizeStripe}
    * @param streamStart the starting offset of streams in the file
    * @param footer the footer for the stripe
    * @param columnInclude which columns are being read
@@ -332,19 +337,40 @@ public class StripePlanner {
                            boolean[] columnInclude) throws IOException {
     long currentOffset = streamStart;
     Arrays.fill(bloomFilterKinds, null);
+    // +-----------------+---------------+-----------------+---------------+
+    // |                 |               |                 |               |
+    // |   unencrypted   |   encrypted   |   unencrypted   |   encrypted   |
+    // |      index      |     index     |      data       |     data      |
+    // |                 |               |                 |               |
+    // +-----------------+---------------+-----------------+---------------+
+    // Storage layout of index and data, So we need to find the streams in this order
+    //
+    // find index streams, encrypted first and then unencrypted
+    currentOffset = findStreamsByArea(currentOffset, footer, StreamName.Area.INDEX, columnInclude);
+
+    // find data streams, encrypted first and then unencrypted
+    findStreamsByArea(currentOffset, footer, StreamName.Area.DATA, columnInclude);
+  }
+
+  private long findStreamsByArea(long currentOffset,
+                                OrcProto.StripeFooter footer,
+                                StreamName.Area area,
+                                boolean[] columnInclude) {
+    // find unencrypted streams
     for(OrcProto.Stream stream: footer.getStreamsList()) {
-      currentOffset += handleStream(currentOffset, columnInclude, stream, null);
+      currentOffset += handleStream(currentOffset, columnInclude, stream, area, null);
     }
 
-    // Add the encrypted streams that we are using
+    // find encrypted streams
     for(ReaderEncryptionVariant variant: encryption.getVariants()) {
       int variantId = variant.getVariantId();
       OrcProto.StripeEncryptionVariant stripeVariant =
           footer.getEncryption(variantId);
       for(OrcProto.Stream stream: stripeVariant.getStreamsList()) {
-        currentOffset += handleStream(currentOffset, columnInclude, stream, variant);
+        currentOffset += handleStream(currentOffset, columnInclude, stream, area, variant);
       }
     }
+    return currentOffset;
   }
 
   /**
diff --git a/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
index d06f3a5..515fbf9 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
@@ -45,7 +45,7 @@ public class Decimal64TreeWriter extends TreeWriterBase {
                              WriterContext writer) throws IOException {
     super(schema, encryption, writer);
     OutStream stream = writer.createStream(
-        new StreamName(id, OrcProto.Stream.Kind.DATA));
+        new StreamName(id, OrcProto.Stream.Kind.DATA, encryption));
     // Use RLEv2 until we have the new RLEv3.
     valueWriter = new RunLengthIntegerWriterV2(stream, true, true);
     scale = schema.getScale();
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
index 0369f98..ef93a31 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
@@ -279,7 +279,7 @@ public abstract class TreeWriterBase implements TreeWriter {
             "index entries found: " + rowIndex.getEntryCount() + " expected: " +
             requiredIndexEntries);
       }
-      context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
+      context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX, encryption), rowIndex);
       rowIndex.clear();
       rowIndexEntry.clear();
     }
diff --git a/java/core/src/test/org/apache/orc/impl/TestEncryption.java b/java/core/src/test/org/apache/orc/impl/TestEncryption.java
new file mode 100644
index 0000000..64fcbcf
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestEncryption.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.InMemoryKeystore;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestEncryption {
+
+  Path workDir = new Path(System.getProperty("test.tmp.dir"));
+  Configuration conf;
+  FileSystem fs;
+  Path testFilePath;
+  TypeDescription schema;
+  KeyProvider keyProvider;
+  String encryption;
+  String mask;
+
+  @BeforeEach
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), VectorizedRowBatch.DEFAULT_SIZE);
+    fs = FileSystem.getLocal(conf);
+    fs.setWorkingDirectory(workDir);
+    testFilePath = new Path("testWriterImpl.orc");
+    fs.create(testFilePath, true);
+    schema = TypeDescription.fromString("struct<id:int,name:string>");
+    byte[] kmsKey = "secret123".getBytes(StandardCharsets.UTF_8);
+    keyProvider = new InMemoryKeystore()
+        .addKey("pii", EncryptionAlgorithm.AES_CTR_128, kmsKey);
+    encryption = "pii:id,name";
+    mask = "sha256:id,name";
+  }
+
+  @AfterEach
+  public void deleteTestFile() throws Exception {
+    fs.delete(testFilePath, false);
+  }
+
+  private void write() throws IOException {
+    Writer writer = OrcFile.createWriter(testFilePath,
+        OrcFile.writerOptions(conf)
+            .setSchema(schema)
+            .overwrite(true)
+            .setKeyProvider(keyProvider)
+            .encrypt(encryption)
+            .masks(mask));
+    VectorizedRowBatch batch = schema.createRowBatch();
+    LongColumnVector id = (LongColumnVector) batch.cols[0];
+    BytesColumnVector name = (BytesColumnVector) batch.cols[1];
+    for (int r = 0; r < VectorizedRowBatch.DEFAULT_SIZE * 2; ++r) {
+      int row = batch.size++;
+      id.vector[row] = r;
+      byte[] buffer = ("name-" + (r * 3)).getBytes(StandardCharsets.UTF_8);
+      name.setRef(row, buffer, 0, buffer.length);
+      if (batch.size == batch.getMaxSize()) {
+        writer.addRowBatch(batch);
+        batch.reset();
+      }
+    }
+    if (batch.size != 0) {
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+  }
+
+  private void read(boolean pushDown) throws IOException {
+    Reader reader = OrcFile.createReader(testFilePath,
+        OrcFile.readerOptions(conf).setKeyProvider(keyProvider));
+    SearchArgument searchArgument = pushDown ? SearchArgumentFactory.newBuilder()
+        .equals("id", PredicateLeaf.Type.LONG, (long) VectorizedRowBatch.DEFAULT_SIZE)
+        .build() : null;
+    VectorizedRowBatch batch = schema.createRowBatch();
+    Reader.Options options = reader.options().schema(this.schema);
+    if (pushDown) {
+      options = options.searchArgument(searchArgument, new String[]{"id"});
+    }
+    RecordReader rowIterator = reader.rows(options);
+    LongColumnVector idColumn = (LongColumnVector) batch.cols[0];
+    BytesColumnVector nameColumn = (BytesColumnVector) batch.cols[1];
+    int batchNum = pushDown ? 1 : 0;
+    while (rowIterator.nextBatch(batch)) {
+      for (int row = 0; row < batch.size; ++row) {
+        long value = row + ((long) batchNum * VectorizedRowBatch.DEFAULT_SIZE);
+        assertEquals(value, idColumn.vector[row]);
+        assertEquals("name-" + (value * 3), nameColumn.toString(row));
+      }
+      batchNum ++;
+    }
+    rowIterator.close();
+  }
+
+  @Test
+  public void testReadEncryption() throws IOException {
+    write();
+    read(false);
+  }
+
+  @Test
+  public void testPushDownReadEncryption() throws IOException {
+    write();
+    read(true);
+  }
+
+}