You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2021/09/12 18:27:02 UTC
[orc] branch branch-1.7 updated: ORC-991: Fix the bug of encrypted
column read crash (#905)
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new a3de6da ORC-991: Fix the bug of encrypted column read crash (#905)
a3de6da is described below
commit a3de6daa7840e89470f83cd30aeffc9631e17e63
Author: Guiyanakaung <gu...@gmail.com>
AuthorDate: Mon Sep 13 02:26:15 2021 +0800
ORC-991: Fix the bug of encrypted column read crash (#905)
### What changes were proposed in this pull request?
1. RowIndex is never marked as encrypted. The StreamName constructor adds encryption to make rowIndex encrypted when needed.
TreeWriterBase.java
```java
public void writeStripe(int requiredIndexEntries) throws IOException {
.....
- context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
+ context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX, encryption), rowIndex);
.....
}
```
2. findStreams in StripePlanner.java total offset order and write inconsistency.
finalizeStripe In PhysicalFsWriter.java
> 1. write the unencrypted index streams
> 2. write the encrypted index streams
> 3. write the unencrypted data streams
> 4. write the encrypted data streams
```java
@Override
public void finalizeStripe(OrcProto.StripeFooter.Builder footerBuilder,
OrcProto.StripeInformation.Builder dirEntry
) throws IOException {
.....
// write the unencrypted index streams
unencrypted.writeStreams(StreamName.Area.INDEX, rawWriter);
// write the encrypted index streams
for (VariantTracker variant: variants.values()) {
variant.writeStreams(StreamName.Area.INDEX, rawWriter);
}
// write the unencrypted data streams
unencrypted.writeStreams(StreamName.Area.DATA, rawWriter);
// write out the encrypted data streams
for (VariantTracker variant: variants.values()) {
variant.writeStreams(StreamName.Area.DATA, rawWriter);
}
.....
}
```
findStreams in StripePlanner.java
> 1. total offset the unencrypted index/data streams
> 2. total offset encrypted index/data streams
```java
private void findStreams(long streamStart,
OrcProto.StripeFooter footer,
boolean[] columnInclude) throws IOException {
long currentOffset = streamStart;
Arrays.fill(bloomFilterKinds, null);
for(OrcProto.Stream stream: footer.getStreamsList()) {
currentOffset += handleStream(currentOffset, columnInclude, stream, null);
}
// Add the encrypted streams that we are using
for(ReaderEncryptionVariant variant: encryption.getVariants()) {
int variantId = variant.getVariantId();
OrcProto.StripeEncryptionVariant stripeVariant =
footer.getEncryption(variantId);
for(OrcProto.Stream stream: stripeVariant.getStreamsList()) {
currentOffset += handleStream(currentOffset, columnInclude, stream, variant);
}
}
}
```
Causes misalignment of data reading.
This pr ensures that the read offset is consistent with the write.
3. Fix Decimal64TreeWriter stream not binding encryption.
### Why are the changes needed?
Fix the bug of encrypted column read crash.
### How was this patch tested?
Added unit test for reading encrypted columns.
(cherry picked from commit 792c3f820d0b7a64b27c9dc4c390443386e6baf0)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../org/apache/orc/impl/reader/StripePlanner.java | 34 ++++-
.../orc/impl/writer/Decimal64TreeWriter.java | 2 +-
.../org/apache/orc/impl/writer/TreeWriterBase.java | 2 +-
.../test/org/apache/orc/impl/TestEncryption.java | 143 +++++++++++++++++++++
4 files changed, 175 insertions(+), 6 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java b/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
index a08d664..0636454 100644
--- a/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
+++ b/java/core/src/java/org/apache/orc/impl/reader/StripePlanner.java
@@ -30,6 +30,7 @@ import org.apache.orc.impl.BufferChunkList;
import org.apache.orc.impl.CryptoUtils;
import org.apache.orc.impl.InStream;
import org.apache.orc.impl.OrcIndex;
+import org.apache.orc.impl.PhysicalFsWriter;
import org.apache.orc.impl.RecordReaderUtils;
import org.apache.orc.impl.StreamName;
import org.apache.orc.impl.reader.tree.TypeReader;
@@ -273,18 +274,20 @@ public class StripePlanner {
* @param offset the position in the file for this stream
* @param columnInclude which columns are being read
* @param stream the stream to consider
+ * @param area only the area will be included
* @param variant the variant being read
* @return the offset for the next stream
*/
private long handleStream(long offset,
boolean[] columnInclude,
OrcProto.Stream stream,
+ StreamName.Area area,
ReaderEncryptionVariant variant) {
int column = stream.getColumn();
if (stream.hasKind()) {
OrcProto.Stream.Kind kind = stream.getKind();
- if (kind == OrcProto.Stream.Kind.ENCRYPTED_INDEX ||
+ if (StreamName.getArea(kind) != area || kind == OrcProto.Stream.Kind.ENCRYPTED_INDEX ||
kind == OrcProto.Stream.Kind.ENCRYPTED_DATA) {
// Ignore the placeholders that shouldn't count toward moving the
// offsets.
@@ -323,6 +326,8 @@ public class StripePlanner {
/**
* Find the complete list of streams.
+ * CurrentOffset total order must be consistent with write
+ * {@link PhysicalFsWriter#finalizeStripe}
* @param streamStart the starting offset of streams in the file
* @param footer the footer for the stripe
* @param columnInclude which columns are being read
@@ -332,19 +337,40 @@ public class StripePlanner {
boolean[] columnInclude) throws IOException {
long currentOffset = streamStart;
Arrays.fill(bloomFilterKinds, null);
+ // +-----------------+---------------+-----------------+---------------+
+ // | | | | |
+ // | unencrypted | encrypted | unencrypted | encrypted |
+ // | index | index | data | data |
+ // | | | | |
+ // +-----------------+---------------+-----------------+---------------+
+ // Storage layout of index and data, So we need to find the streams in this order
+ //
+ // find index streams, encrypted first and then unencrypted
+ currentOffset = findStreamsByArea(currentOffset, footer, StreamName.Area.INDEX, columnInclude);
+
+ // find data streams, encrypted first and then unencrypted
+ findStreamsByArea(currentOffset, footer, StreamName.Area.DATA, columnInclude);
+ }
+
+ private long findStreamsByArea(long currentOffset,
+ OrcProto.StripeFooter footer,
+ StreamName.Area area,
+ boolean[] columnInclude) {
+ // find unencrypted streams
for(OrcProto.Stream stream: footer.getStreamsList()) {
- currentOffset += handleStream(currentOffset, columnInclude, stream, null);
+ currentOffset += handleStream(currentOffset, columnInclude, stream, area, null);
}
- // Add the encrypted streams that we are using
+ // find encrypted streams
for(ReaderEncryptionVariant variant: encryption.getVariants()) {
int variantId = variant.getVariantId();
OrcProto.StripeEncryptionVariant stripeVariant =
footer.getEncryption(variantId);
for(OrcProto.Stream stream: stripeVariant.getStreamsList()) {
- currentOffset += handleStream(currentOffset, columnInclude, stream, variant);
+ currentOffset += handleStream(currentOffset, columnInclude, stream, area, variant);
}
}
+ return currentOffset;
}
/**
diff --git a/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
index d06f3a5..515fbf9 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
@@ -45,7 +45,7 @@ public class Decimal64TreeWriter extends TreeWriterBase {
WriterContext writer) throws IOException {
super(schema, encryption, writer);
OutStream stream = writer.createStream(
- new StreamName(id, OrcProto.Stream.Kind.DATA));
+ new StreamName(id, OrcProto.Stream.Kind.DATA, encryption));
// Use RLEv2 until we have the new RLEv3.
valueWriter = new RunLengthIntegerWriterV2(stream, true, true);
scale = schema.getScale();
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
index 0369f98..ef93a31 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
@@ -279,7 +279,7 @@ public abstract class TreeWriterBase implements TreeWriter {
"index entries found: " + rowIndex.getEntryCount() + " expected: " +
requiredIndexEntries);
}
- context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
+ context.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX, encryption), rowIndex);
rowIndex.clear();
rowIndexEntry.clear();
}
diff --git a/java/core/src/test/org/apache/orc/impl/TestEncryption.java b/java/core/src/test/org/apache/orc/impl/TestEncryption.java
new file mode 100644
index 0000000..64fcbcf
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestEncryption.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.InMemoryKeystore;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestEncryption {
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir"));
+ Configuration conf;
+ FileSystem fs;
+ Path testFilePath;
+ TypeDescription schema;
+ KeyProvider keyProvider;
+ String encryption;
+ String mask;
+
+ @BeforeEach
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ conf.setInt(OrcConf.ROW_INDEX_STRIDE.getAttribute(), VectorizedRowBatch.DEFAULT_SIZE);
+ fs = FileSystem.getLocal(conf);
+ fs.setWorkingDirectory(workDir);
+ testFilePath = new Path("testWriterImpl.orc");
+ fs.create(testFilePath, true);
+ schema = TypeDescription.fromString("struct<id:int,name:string>");
+ byte[] kmsKey = "secret123".getBytes(StandardCharsets.UTF_8);
+ keyProvider = new InMemoryKeystore()
+ .addKey("pii", EncryptionAlgorithm.AES_CTR_128, kmsKey);
+ encryption = "pii:id,name";
+ mask = "sha256:id,name";
+ }
+
+ @AfterEach
+ public void deleteTestFile() throws Exception {
+ fs.delete(testFilePath, false);
+ }
+
+ private void write() throws IOException {
+ Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf)
+ .setSchema(schema)
+ .overwrite(true)
+ .setKeyProvider(keyProvider)
+ .encrypt(encryption)
+ .masks(mask));
+ VectorizedRowBatch batch = schema.createRowBatch();
+ LongColumnVector id = (LongColumnVector) batch.cols[0];
+ BytesColumnVector name = (BytesColumnVector) batch.cols[1];
+ for (int r = 0; r < VectorizedRowBatch.DEFAULT_SIZE * 2; ++r) {
+ int row = batch.size++;
+ id.vector[row] = r;
+ byte[] buffer = ("name-" + (r * 3)).getBytes(StandardCharsets.UTF_8);
+ name.setRef(row, buffer, 0, buffer.length);
+ if (batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ }
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ }
+ writer.close();
+ }
+
+ private void read(boolean pushDown) throws IOException {
+ Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).setKeyProvider(keyProvider));
+ SearchArgument searchArgument = pushDown ? SearchArgumentFactory.newBuilder()
+ .equals("id", PredicateLeaf.Type.LONG, (long) VectorizedRowBatch.DEFAULT_SIZE)
+ .build() : null;
+ VectorizedRowBatch batch = schema.createRowBatch();
+ Reader.Options options = reader.options().schema(this.schema);
+ if (pushDown) {
+ options = options.searchArgument(searchArgument, new String[]{"id"});
+ }
+ RecordReader rowIterator = reader.rows(options);
+ LongColumnVector idColumn = (LongColumnVector) batch.cols[0];
+ BytesColumnVector nameColumn = (BytesColumnVector) batch.cols[1];
+ int batchNum = pushDown ? 1 : 0;
+ while (rowIterator.nextBatch(batch)) {
+ for (int row = 0; row < batch.size; ++row) {
+ long value = row + ((long) batchNum * VectorizedRowBatch.DEFAULT_SIZE);
+ assertEquals(value, idColumn.vector[row]);
+ assertEquals("name-" + (value * 3), nameColumn.toString(row));
+ }
+ batchNum ++;
+ }
+ rowIterator.close();
+ }
+
+ @Test
+ public void testReadEncryption() throws IOException {
+ write();
+ read(false);
+ }
+
+ @Test
+ public void testPushDownReadEncryption() throws IOException {
+ write();
+ read(true);
+ }
+
+}