You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2018/07/13 21:28:49 UTC
[1/2] orc git commit: ORC-251: Extend InStream and OutStream to
support encryption.
Repository: orc
Updated Branches:
refs/heads/master f3dd9c159 -> edbb9673d
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestInStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index a27c49a..fe7b53d 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,29 +18,35 @@
package org.apache.orc.impl;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.fail;
-
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.security.Key;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.OrcProto;
import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.writer.StreamOptions;
import org.junit.Test;
+import javax.crypto.spec.SecretKeySpec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
public class TestInStream {
public static class OutputCollector implements PhysicalWriter.OutputReceiver {
public DynamicByteArray buffer = new DynamicByteArray();
@Override
- public void output(ByteBuffer buffer) throws IOException {
+ public void output(ByteBuffer buffer) {
this.buffer.add(buffer.array(), buffer.arrayOffset() + buffer.position(),
buffer.remaining());
}
@@ -53,7 +59,7 @@ public class TestInStream {
static class PositionCollector
implements PositionProvider, PositionRecorder {
- private List<Long> positions = new ArrayList<Long>();
+ private List<Long> positions = new ArrayList<>();
private int index = 0;
@Override
@@ -101,10 +107,10 @@ public class TestInStream {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
- new long[]{0}, inBuf.remaining(), null, 100);
+ InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
+ inBuf.remaining());
assertEquals("uncompressed stream test position: 0 length: 1024" +
- " range: 0 offset: 0 limit: 0",
+ " range: 0 offset: 0 limit: 1024",
in.toString());
for(int i=0; i < 1024; ++i) {
int x = in.read();
@@ -117,6 +123,121 @@ public class TestInStream {
}
@Test
+ public void testEncrypted() throws Exception {
+ final long DATA_CONST = 0x1_0000_0003L;
+ final int ROW_COUNT = 1024;
+ OutputCollector collect = new OutputCollector();
+ EncryptionAlgorithm algorithm = EncryptionAlgorithm.AES_128;
+ byte[] rawKey = new byte[algorithm.keyLength()];
+ for(int i=0; i < rawKey.length; ++i) {
+ rawKey[i] = (byte) i;
+ }
+ Key decryptKey = new SecretKeySpec(rawKey, algorithm.getAlgorithm());
+ StreamName name = new StreamName(0, OrcProto.Stream.Kind.DATA);
+ byte[] iv = CryptoUtils.createIvForStream(algorithm, name, 0);
+ StreamOptions writerOptions = new StreamOptions(100)
+ .withEncryption(algorithm, decryptKey, iv);
+ OutStream out = new OutStream("test", writerOptions, collect);
+ PositionCollector[] positions = new PositionCollector[ROW_COUNT];
+ DataOutputStream outStream = new DataOutputStream(out);
+ for(int i=0; i < ROW_COUNT; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ outStream.writeLong(i * DATA_CONST);
+ }
+ out.flush();
+ assertEquals(ROW_COUNT * 8, collect.buffer.size());
+
+ // Allocate the stream into three ranges. making sure that they don't fall
+ // on the 16 byte aes boundaries.
+ int[] rangeSizes = {1965, ROW_COUNT * 8 - 1965 - 15, 15};
+ int offset = 0;
+ BufferChunkList list = new BufferChunkList();
+ for(int size: rangeSizes) {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ collect.buffer.setByteBuffer(buffer, offset, size);
+ buffer.flip();
+ list.add(new BufferChunk(buffer, offset));
+ offset += size;
+ }
+
+ InStream in = InStream.create("test", list.get(), collect.buffer.size(),
+ InStream.options().withEncryption(algorithm, decryptKey, iv));
+ assertEquals("encrypted uncompressed stream test position: 0 length: 8192" +
+ " range: 0 offset: 0 limit: 1965",
+ in.toString());
+ DataInputStream inputStream = new DataInputStream(in);
+ for(int i=0; i < ROW_COUNT; ++i) {
+ assertEquals("row " + i, i * DATA_CONST, inputStream.readLong());
+ }
+ for(int i=ROW_COUNT - 1; i >= 0; --i) {
+ in.seek(positions[i]);
+ assertEquals("row " + i, i * DATA_CONST, inputStream.readLong());
+ }
+ }
+
+ @Test
+ public void testCompressedEncrypted() throws Exception {
+ final long DATA_CONST = 0x1_0000_0003L;
+ final int ROW_COUNT = 1024;
+ OutputCollector collect = new OutputCollector();
+ EncryptionAlgorithm algorithm = EncryptionAlgorithm.AES_128;
+ byte[] rawKey = new byte[algorithm.keyLength()];
+ for(int i=0; i < rawKey.length; ++i) {
+ rawKey[i] = (byte) i;
+ }
+ Key decryptKey = new SecretKeySpec(rawKey, algorithm.getAlgorithm());
+ StreamName name = new StreamName(0, OrcProto.Stream.Kind.DATA);
+ byte[] iv = CryptoUtils.createIvForStream(algorithm, name, 0);
+ StreamOptions writerOptions = new StreamOptions(500)
+ .withCodec(new ZlibCodec())
+ .withEncryption(algorithm, decryptKey, iv);
+ OutStream out = new OutStream("test", writerOptions, collect);
+ PositionCollector[] positions = new PositionCollector[ROW_COUNT];
+ DataOutputStream outStream = new DataOutputStream(out);
+ for(int i=0; i < ROW_COUNT; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ outStream.writeLong(i * DATA_CONST);
+ }
+ out.flush();
+ // currently 3957 bytes
+ int compressedSize = collect.buffer.size();
+
+ // Allocate the stream into three ranges. making sure that they don't fall
+ // on the 16 byte aes boundaries.
+ int[] rangeSizes = {1998, compressedSize - 1998 - 15, 15};
+ int offset = 0;
+ BufferChunkList list = new BufferChunkList();
+ for(int size: rangeSizes) {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ collect.buffer.setByteBuffer(buffer, offset, size);
+ buffer.flip();
+ list.add(new BufferChunk(buffer, offset));
+ offset += size;
+ }
+
+ InStream in = InStream.create("test", list.get(), collect.buffer.size(),
+ InStream.options()
+ .withCodec(new ZlibCodec()).withBufferSize(500)
+ .withEncryption(algorithm, decryptKey, iv));
+ assertEquals("encrypted compressed stream test position: 0 length: " +
+ compressedSize + " range: 0 offset: 0 limit: 1998 range 0 = 0 to" +
+ " 1998; range 1 = 1998 to " + (compressedSize - 15) +
+ "; range 2 = " +
+ (compressedSize - 15) + " to " + compressedSize,
+ in.toString());
+ DataInputStream inputStream = new DataInputStream(in);
+ for(int i=0; i < ROW_COUNT; ++i) {
+ assertEquals("row " + i, i * DATA_CONST, inputStream.readLong());
+ }
+ for(int i=ROW_COUNT - 1; i >= 0; --i) {
+ in.seek(positions[i]);
+ assertEquals("row " + i, i * DATA_CONST, inputStream.readLong());
+ }
+ }
+
+ @Test
public void testCompressed() throws Exception {
OutputCollector collect = new OutputCollector();
CompressionCodec codec = new ZlibCodec();
@@ -133,10 +254,11 @@ public class TestInStream {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
- new long[]{0}, inBuf.remaining(), codec, 300);
+ InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
+ inBuf.remaining(),
+ InStream.options().withCodec(codec).withBufferSize(300));
assertEquals("compressed stream test position: 0 length: 961 range: 0" +
- " offset: 0 limit: 0 range 0 = 0 to 961",
+ " offset: 0 limit: 961 range 0 = 0 to 961",
in.toString());
for(int i=0; i < 1024; ++i) {
int x = in.read();
@@ -166,8 +288,9 @@ public class TestInStream {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
- new long[]{0}, inBuf.remaining(), codec, 100);
+ InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
+ inBuf.remaining(),
+ InStream.options().withCodec(codec).withBufferSize(100));
byte[] contents = new byte[1024];
try {
in.read(contents);
@@ -181,8 +304,9 @@ public class TestInStream {
inBuf.put((byte) 32);
inBuf.put((byte) 0);
inBuf.flip();
- in = InStream.create("test2", new ByteBuffer[]{inBuf}, new long[]{0},
- inBuf.remaining(), codec, 300);
+ in = InStream.create("test2", new BufferChunk(inBuf, 0),
+ inBuf.remaining(),
+ InStream.options().withCodec(codec).withBufferSize(300));
try {
in.read();
fail();
@@ -214,14 +338,19 @@ public class TestInStream {
collect.buffer.setByteBuffer(inBuf[1], 483, 1625 - 483);
collect.buffer.setByteBuffer(inBuf[2], 1625, 1674 - 1625);
- for(int i=0; i < inBuf.length; ++i) {
- inBuf[i].flip();
+ BufferChunkList buffers = new BufferChunkList();
+ int offset = 0;
+ for(ByteBuffer buffer: inBuf) {
+ buffer.flip();
+ buffers.add(new BufferChunk(buffer, offset));
+ offset += buffer.remaining();
}
- InStream in = InStream.create("test", inBuf,
- new long[]{0,483, 1625}, 1674, codec, 400);
+ InStream.StreamOptions options = InStream.options()
+ .withCodec(codec).withBufferSize(400);
+ InStream in = InStream.create("test", buffers.get(), 1674, options);
assertEquals("compressed stream test position: 0 length: 1674 range: 0" +
- " offset: 0 limit: 0 range 0 = 0 to 483;" +
- " range 1 = 483 to 1142; range 2 = 1625 to 49",
+ " offset: 0 limit: 483 range 0 = 0 to 483;" +
+ " range 1 = 483 to 1625; range 2 = 1625 to 1674",
in.toString());
DataInputStream inStream = new DataInputStream(in);
for(int i=0; i < 1024; ++i) {
@@ -234,8 +363,10 @@ public class TestInStream {
assertEquals(i, inStream.readInt());
}
- in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]},
- new long[]{483, 1625}, 1674, codec, 400);
+ buffers.clear();
+ buffers.add(new BufferChunk(inBuf[1], 483));
+ buffers.add(new BufferChunk(inBuf[2], 1625));
+ in = InStream.create("test", buffers.get(), 1674, options);
inStream = new DataInputStream(in);
positions[303].reset();
in.seek(positions[303]);
@@ -243,8 +374,10 @@ public class TestInStream {
assertEquals(i, inStream.readInt());
}
- in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]},
- new long[]{0, 1625}, 1674, codec, 400);
+ buffers.clear();
+ buffers.add(new BufferChunk(inBuf[0], 0));
+ buffers.add(new BufferChunk(inBuf[2], 1625));
+ in = InStream.create("test", buffers.get(), 1674, options);
inStream = new DataInputStream(in);
positions[1001].reset();
for(int i=0; i < 300; ++i) {
@@ -278,13 +411,16 @@ public class TestInStream {
collect.buffer.setByteBuffer(inBuf[1], 1024, 2048);
collect.buffer.setByteBuffer(inBuf[2], 3072, 1024);
- for(int i=0; i < inBuf.length; ++i) {
- inBuf[i].flip();
+ for(ByteBuffer buffer: inBuf) {
+ buffer.flip();
}
- InStream in = InStream.create("test", inBuf,
- new long[]{0, 1024, 3072}, 4096, null, 400);
+ BufferChunkList buffers = new BufferChunkList();
+ buffers.add(new BufferChunk(inBuf[0], 0));
+ buffers.add(new BufferChunk(inBuf[1], 1024));
+ buffers.add(new BufferChunk(inBuf[2], 3072));
+ InStream in = InStream.create("test", buffers.get(), 4096);
assertEquals("uncompressed stream test position: 0 length: 4096" +
- " range: 0 offset: 0 limit: 0",
+ " range: 0 offset: 0 limit: 1024",
in.toString());
DataInputStream inStream = new DataInputStream(in);
for(int i=0; i < 1024; ++i) {
@@ -297,8 +433,10 @@ public class TestInStream {
assertEquals(i, inStream.readInt());
}
- in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]},
- new long[]{1024, 3072}, 4096, null, 400);
+ buffers.clear();
+ buffers.add(new BufferChunk(inBuf[1], 1024));
+ buffers.add(new BufferChunk(inBuf[2], 3072));
+ in = InStream.create("test", buffers.get(), 4096);
inStream = new DataInputStream(in);
positions[256].reset();
in.seek(positions[256]);
@@ -306,8 +444,10 @@ public class TestInStream {
assertEquals(i, inStream.readInt());
}
- in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]},
- new long[]{0, 3072}, 4096, null, 400);
+ buffers.clear();
+ buffers.add(new BufferChunk(inBuf[0], 0));
+ buffers.add(new BufferChunk(inBuf[2], 3072));
+ in = InStream.create("test", buffers.get(), 4096);
inStream = new DataInputStream(in);
positions[768].reset();
for(int i=0; i < 256; ++i) {
@@ -321,9 +461,8 @@ public class TestInStream {
@Test
public void testEmptyDiskRange() throws IOException {
- List<DiskRange> rangeList = new ArrayList<>();
- rangeList.add(new BufferChunk(ByteBuffer.allocate(0), 0));
- InStream stream = new InStream.UncompressedStream("test", rangeList, 0);
+ DiskRangeList range = new BufferChunk(ByteBuffer.allocate(0), 0);
+ InStream stream = new InStream.UncompressedStream("test", range, 0);
assertEquals(0, stream.available());
stream.seek(new PositionProvider() {
@Override
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
index 399f35e..0888cef 100644
--- a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -55,10 +55,9 @@ public class TestIntegerCompressionReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
RunLengthIntegerReaderV2 in =
- new RunLengthIntegerReaderV2(InStream.create
- ("test", new ByteBuffer[]{inBuf},
- new long[]{0}, inBuf.remaining(),
- codec, 1000), true, false);
+ new RunLengthIntegerReaderV2(InStream.create("test",
+ new BufferChunk(inBuf, 0), inBuf.remaining(),
+ InStream.options().withCodec(codec).withBufferSize(1000)), true, false);
for(int i=0; i < 2048; ++i) {
int x = (int) in.next();
if (i < 1024) {
@@ -110,10 +109,8 @@ public class TestIntegerCompressionReader {
inBuf.flip();
RunLengthIntegerReaderV2 in =
new RunLengthIntegerReaderV2(InStream.create("test",
- new ByteBuffer[]{inBuf},
- new long[]{0},
- inBuf.remaining(),
- null, 100), true, false);
+ new BufferChunk(inBuf, 0),
+ inBuf.remaining()), true, false);
for(int i=0; i < 2048; i += 10) {
int x = (int) in.next();
if (i < 1024) {
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestOutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestOutStream.java b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
index 8abd12b..473c7a6 100644
--- a/java/core/src/test/org/apache/orc/impl/TestOutStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,17 +19,38 @@
package org.apache.orc.impl;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.OrcProto;
import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.writer.StreamOptions;
+import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.Key;
+import java.security.NoSuchAlgorithmException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class TestOutStream {
+ public static final boolean TEST_AES_256;
+ static {
+ try {
+ TEST_AES_256 = Cipher.getMaxAllowedKeyLength("AES") != 128;
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalArgumentException("Unknown algorithm", e);
+ }
+ }
+
@Test
public void testFlush() throws Exception {
PhysicalWriter.OutputReceiver receiver =
@@ -44,7 +65,7 @@ public class TestOutStream {
}
@Test
- public void testAssertBufferSizeValid() throws Exception {
+ public void testAssertBufferSizeValid() {
try {
OutStream.assertBufferSizeValid(1 + (1<<23));
fail("Invalid buffer-size " + (1 + (1<<23)) + " should have been blocked.");
@@ -55,4 +76,143 @@ public class TestOutStream {
OutStream.assertBufferSizeValid((1<<23) - 1);
}
+
+ @Test
+ public void testEncryption() throws Exception {
+ TestInStream.OutputCollector receiver = new TestInStream.OutputCollector();
+ EncryptionAlgorithm aes128 = EncryptionAlgorithm.AES_128;
+ byte[] keyBytes = new byte[aes128.keyLength()];
+ for(int i=0; i < keyBytes.length; ++i) {
+ keyBytes[i] = (byte) i;
+ }
+ Key material = new SecretKeySpec(keyBytes, aes128.getAlgorithm());
+ StreamName name = new StreamName(0x34, OrcProto.Stream.Kind.DATA);
+ // test out stripe 18
+ byte[] iv = CryptoUtils.createIvForStream(aes128, name, 18);
+ StreamOptions options = new StreamOptions(50)
+ .withEncryption(aes128, material, iv);
+ OutStream stream = new OutStream("test", options, receiver);
+ byte[] data = new byte[210];
+ for(int i=0; i < data.length; ++i) {
+ data[i] = (byte) (i+3);
+ }
+
+ stream.write(data);
+ stream.flush();
+ byte[] output = receiver.buffer.get();
+
+ // These are the outputs of aes256 with the key and incrementing ivs.
+ // I included these hardcoded values to make sure that we are getting
+ // AES128 encryption.
+ //
+ // I used http://extranet.cryptomathic.com/aescalc/index to compute these:
+ // key: 000102030405060708090a0b0c0d0e0f
+ // input: 00003400010000120000000000000000
+ // ecb encrypt output: 822252A81CC7E7FE3E51F50E0E9B64B1
+ int[] generated = new int[]{
+ 0x82, 0x22, 0x52, 0xA8, 0x1C, 0xC7, 0xE7, 0xFE, // block 0
+ 0x3E, 0x51, 0xF5, 0x0E, 0x0E, 0x9B, 0x64, 0xB1,
+
+ 0xF6, 0x4D, 0x36, 0xA9, 0xD9, 0xD7, 0x55, 0xDE, // block 1
+ 0xCB, 0xD5, 0x62, 0x0E, 0x6D, 0xA6, 0x6B, 0x16,
+
+ 0x00, 0x0B, 0xE8, 0xBA, 0x9D, 0xDE, 0x78, 0xEC, // block 2
+ 0x73, 0x05, 0xF6, 0x1E, 0x76, 0xD7, 0x9B, 0x7A,
+
+ 0x47, 0xE9, 0x61, 0x90, 0x65, 0x8B, 0x54, 0xAC, // block 3
+ 0xF2, 0x3F, 0x67, 0xAE, 0x25, 0x63, 0x1D, 0x4B,
+
+ 0x41, 0x48, 0xC4, 0x15, 0x5F, 0x2A, 0x7F, 0x91, // block 4
+ 0x9A, 0x87, 0xA1, 0x09, 0xFF, 0x68, 0x68, 0xCC,
+
+ 0xC0, 0x80, 0x52, 0xD4, 0xA5, 0x07, 0x4B, 0x79, // block 5
+ 0xC7, 0x08, 0x46, 0x46, 0x8C, 0x74, 0x2C, 0x0D,
+
+ 0x9F, 0x55, 0x7E, 0xA7, 0x17, 0x47, 0x91, 0xFD, // block 6
+ 0x01, 0xD4, 0x24, 0x1F, 0x76, 0xA1, 0xDC, 0xC3,
+
+ 0xEA, 0x13, 0x4C, 0x29, 0xCA, 0x68, 0x1E, 0x4F, // block 7
+ 0x0D, 0x19, 0xE5, 0x09, 0xF9, 0xC5, 0xF4, 0x15,
+
+ 0x9A, 0xAD, 0xC4, 0xA1, 0x0F, 0x28, 0xD4, 0x3D, // block 8
+ 0x59, 0xF0, 0x68, 0xD3, 0xC4, 0x98, 0x74, 0x68,
+
+ 0x37, 0xA4, 0xF4, 0x7C, 0x02, 0xCE, 0xC6, 0xCA, // block 9
+ 0xA1, 0xF8, 0xC3, 0x8C, 0x7B, 0x72, 0x38, 0xD1,
+
+ 0xAA, 0x52, 0x90, 0xDE, 0x28, 0xA1, 0x53, 0x6E, // block a
+ 0xA6, 0x5C, 0xC0, 0x89, 0xC4, 0x21, 0x76, 0xC0,
+
+ 0x1F, 0xED, 0x0A, 0xF9, 0xA2, 0xA7, 0xC1, 0x8D, // block b
+ 0xA0, 0x92, 0x44, 0x4F, 0x60, 0x51, 0x7F, 0xD8,
+
+ 0x6D, 0x16, 0xAF, 0x46, 0x1C, 0x27, 0x20, 0x1C, // block c
+ 0x01, 0xBD, 0xC5, 0x0B, 0x62, 0x3F, 0xEF, 0xEE,
+
+ 0x37, 0xae // block d
+ };
+ assertEquals(generated.length, output.length);
+ for(int i=0; i < generated.length; ++i) {
+ assertEquals("i = " + i, (byte) (generated[i] ^ data[i]), output[i]);
+ }
+
+ receiver.buffer.clear();
+ stream.changeIv(CryptoUtils.createIvForStream(aes128, name, 19));
+
+ data = new byte[]{0x47, 0x77, 0x65, 0x6e};
+ stream.write(data);
+ stream.flush();
+ output = receiver.buffer.get();
+ generated = new int[]{0x16, 0x03, 0xE6, 0xC3};
+ assertEquals(generated.length, output.length);
+ for(int i=0; i < generated.length; ++i) {
+ assertEquals("i = " + i, (byte) (generated[i] ^ data[i]), output[i]);
+ }
+ }
+
+ @Test
+ public void testCompression256Encryption() throws Exception {
+ // disable test if AES_256 is not available
+ Assume.assumeTrue(TEST_AES_256);
+ TestInStream.OutputCollector receiver = new TestInStream.OutputCollector();
+ EncryptionAlgorithm aes256 = EncryptionAlgorithm.AES_256;
+ byte[] keyBytes = new byte[aes256.keyLength()];
+ for(int i=0; i < keyBytes.length; ++i) {
+ keyBytes[i] = (byte) (i * 13);
+ }
+ Key material = new SecretKeySpec(keyBytes, aes256.getAlgorithm());
+ StreamName name = new StreamName(0x1, OrcProto.Stream.Kind.DATA);
+ byte[] iv = CryptoUtils.createIvForStream(aes256, name, 0);
+ StreamOptions options = new StreamOptions(1024)
+ .withCodec(new ZlibCodec())
+ .withEncryption(aes256, material, iv);
+ OutStream stream = new OutStream("test", options, receiver);
+ for(int i=0; i < 10000; ++i) {
+ stream.write(("The Cheesy Poofs " + i + "\n")
+ .getBytes(StandardCharsets.UTF_8));
+ }
+ stream.flush();
+ // get the compressed, encrypted data
+ byte[] encrypted = receiver.buffer.get();
+
+ // decrypt it
+ Cipher decrypt = aes256.createCipher();
+ decrypt.init(Cipher.DECRYPT_MODE, material, new IvParameterSpec(iv));
+ byte[] compressed = decrypt.doFinal(encrypted);
+
+ // use InStream to decompress it
+ BufferChunkList ranges = new BufferChunkList();
+ ranges.add(new BufferChunk(ByteBuffer.wrap(compressed), 0));
+ InStream decompressedStream = InStream.create(name.toString(), ranges.get(),
+ compressed.length,
+ InStream.options().withCodec(new ZlibCodec()).withBufferSize(1024));
+
+ // check the contents of the decompressed stream
+ BufferedReader reader
+ = new BufferedReader(new InputStreamReader(decompressedStream));
+ for(int i=0; i < 10000; ++i) {
+ assertEquals("i = " + i, "The Cheesy Poofs " + i, reader.readLine());
+ }
+ assertEquals(null, reader.readLine());
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
index a14bef1..c0b22fa 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
@@ -47,7 +47,7 @@ public class TestRunLengthByteReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
- new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), null, 100));
+ new BufferChunk(inBuf, 0), inBuf.remaining()));
for(int i=0; i < 2048; ++i) {
int x = in.next() & 0xff;
if (i < 1024) {
@@ -89,7 +89,8 @@ public class TestRunLengthByteReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
- new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), codec, 500));
+ new BufferChunk(inBuf, 0), inBuf.remaining(),
+ InStream.options().withCodec(codec).withBufferSize(500)));
for(int i=0; i < 2048; ++i) {
int x = in.next() & 0xff;
if (i < 1024) {
@@ -126,7 +127,7 @@ public class TestRunLengthByteReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
- new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), null, 100));
+ new BufferChunk(inBuf, 0), inBuf.remaining()));
for(int i=0; i < 2048; i += 10) {
int x = in.next() & 0xff;
if (i < 1024) {
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
index 28239ba..f067f83 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
@@ -55,8 +55,8 @@ public class TestRunLengthIntegerReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
RunLengthIntegerReader in = new RunLengthIntegerReader(InStream.create
- ("test", new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
- codec, 1000), true);
+ ("test", new BufferChunk(inBuf, 0), inBuf.remaining(),
+ InStream.options().withCodec(codec).withBufferSize(1000)), true);
for(int i=0; i < 2048; ++i) {
int x = (int) in.next();
if (i < 1024) {
@@ -107,8 +107,7 @@ public class TestRunLengthIntegerReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
RunLengthIntegerReader in = new RunLengthIntegerReader(InStream.create
- ("test", new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
- null, 100), true);
+ ("test", new BufferChunk(inBuf, 0), inBuf.remaining()), true);
for(int i=0; i < 2048; i += 10) {
int x = (int) in.next();
if (i < 1024) {
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
index d203415..f9a925b 100644
--- a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
+++ b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -34,7 +34,6 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -1543,14 +1542,13 @@ public class TestSchemaEvolution {
OrcProto.Stream.Kind kind,
int... values) throws IOException {
StreamName name = new StreamName(id, kind);
- List<DiskRange> ranges = new ArrayList<>();
+ BufferChunkList ranges = new BufferChunkList();
byte[] buffer = new byte[values.length];
for(int i=0; i < values.length; ++i) {
buffer[i] = (byte) values[i];
}
ranges.add(new BufferChunk(ByteBuffer.wrap(buffer), 0));
- streams.put(name, InStream.create(name.toString(), ranges, values.length, null,
- values.length));
+ streams.put(name, InStream.create(name.toString(), ranges.get(), values.length));
}
@Test
[2/2] orc git commit: ORC-251: Extend InStream and OutStream to
support encryption.
Posted by om...@apache.org.
ORC-251: Extend InStream and OutStream to support encryption.
Fixes #278
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/edbb9673
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/edbb9673
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/edbb9673
Branch: refs/heads/master
Commit: edbb9673d143247633dddcfdbea24f7869b151fe
Parents: f3dd9c1
Author: Owen O'Malley <om...@apache.org>
Authored: Wed May 9 09:36:28 2018 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Fri Jul 13 14:28:06 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/orc/CompressionCodec.java | 5 +
.../org/apache/orc/impl/AircompressorCodec.java | 11 +-
.../org/apache/orc/impl/BufferChunkList.java | 47 ++
.../java/org/apache/orc/impl/CryptoUtils.java | 78 +++
.../src/java/org/apache/orc/impl/InStream.java | 554 ++++++++++++++-----
.../src/java/org/apache/orc/impl/OrcTail.java | 3 +-
.../src/java/org/apache/orc/impl/OutStream.java | 118 +++-
.../java/org/apache/orc/impl/ReaderImpl.java | 89 +--
.../org/apache/orc/impl/RecordReaderImpl.java | 6 +-
.../org/apache/orc/impl/RecordReaderUtils.java | 106 ++--
.../orc/impl/SettableUncompressedStream.java | 43 --
.../java/org/apache/orc/impl/SnappyCodec.java | 3 +-
.../java/org/apache/orc/impl/WriterImpl.java | 4 +-
.../src/java/org/apache/orc/impl/ZlibCodec.java | 6 +
.../apache/orc/impl/writer/StreamOptions.java | 85 +++
.../org/apache/orc/impl/TestBitFieldReader.java | 14 +-
.../test/org/apache/orc/impl/TestBitPack.java | 5 +-
.../org/apache/orc/impl/TestCryptoUtils.java | 47 ++
.../test/org/apache/orc/impl/TestInStream.java | 217 ++++++--
.../orc/impl/TestIntegerCompressionReader.java | 15 +-
.../test/org/apache/orc/impl/TestOutStream.java | 164 +++++-
.../orc/impl/TestRunLengthByteReader.java | 7 +-
.../orc/impl/TestRunLengthIntegerReader.java | 7 +-
.../apache/orc/impl/TestSchemaEvolution.java | 8 +-
24 files changed, 1230 insertions(+), 412 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/CompressionCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/CompressionCodec.java b/java/core/src/java/org/apache/orc/CompressionCodec.java
index dd517b3..1d2af57 100644
--- a/java/core/src/java/org/apache/orc/CompressionCodec.java
+++ b/java/core/src/java/org/apache/orc/CompressionCodec.java
@@ -69,4 +69,9 @@ public interface CompressionCodec {
/** Closes the codec, releasing the resources. */
void close();
+
+ /**
+ * Get the compression kind.
+ */
+ CompressionKind getKind();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
index 39d678c..2609d26 100644
--- a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
@@ -21,16 +21,20 @@ package org.apache.orc.impl;
import io.airlift.compress.Compressor;
import io.airlift.compress.Decompressor;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
public class AircompressorCodec implements CompressionCodec {
+ private final CompressionKind kind;
private final Compressor compressor;
private final Decompressor decompressor;
- AircompressorCodec(Compressor compressor, Decompressor decompressor) {
+ AircompressorCodec(CompressionKind kind, Compressor compressor,
+ Decompressor decompressor) {
+ this.kind = kind;
this.compressor = compressor;
this.decompressor = decompressor;
}
@@ -109,4 +113,9 @@ public class AircompressorCodec implements CompressionCodec {
public void close() {
// Nothing to do.
}
+
+ @Override
+ public CompressionKind getKind() {
+ return kind;
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/BufferChunkList.java b/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
new file mode 100644
index 0000000..d8a89db
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+/**
+ * Builds a list of buffer chunks
+ */
+public class BufferChunkList {
+ private BufferChunk head;
+ private BufferChunk tail;
+
+ public void add(BufferChunk value) {
+ if (head == null) {
+ head = value;
+ tail = value;
+ } else {
+ tail.next = value;
+ value.prev = tail;
+ tail = value;
+ }
+ }
+
+ public BufferChunk get() {
+ return head;
+ }
+
+ public void clear() {
+ head = null;
+ tail = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/CryptoUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/CryptoUtils.java b/java/core/src/java/org/apache/orc/impl/CryptoUtils.java
new file mode 100644
index 0000000..072b06b
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/CryptoUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.EncryptionAlgorithm;
+import java.security.SecureRandom;
+
+/**
+ * This class has routines to work with encryption within ORC files.
+ */
+public class CryptoUtils {
+
+ private static final int COLUMN_ID_LENGTH = 3;
+ private static final int KIND_LENGTH = 2;
+ private static final int STRIPE_ID_LENGTH = 3;
+ private static final int MIN_COUNT_BYTES = 8;
+
+ static final int MAX_COLUMN = 0xffffff;
+ static final int MAX_KIND = 0xffff;
+ static final int MAX_STRIPE = 0xffffff;
+
+ /**
+ * Create a unique IV for each stream within a single key.
+ * The top bytes are set with the column, stream kind, and stripe id and the
+ * lower 8 bytes are always 0.
+ * @param name the stream name
+ * @param stripeId the stripe id
+ * @return the iv for the stream
+ */
+ public static byte[] createIvForStream(EncryptionAlgorithm algorithm,
+ StreamName name,
+ int stripeId) {
+ byte[] iv = new byte[algorithm.getIvLength()];
+ int columnId = name.getColumn();
+ if (columnId < 0 || columnId > MAX_COLUMN) {
+ throw new IllegalArgumentException("ORC encryption is limited to " +
+ MAX_COLUMN + " columns. Value = " + columnId);
+ }
+ int k = name.getKind().getNumber();
+ if (k < 0 || k > MAX_KIND) {
+ throw new IllegalArgumentException("ORC encryption is limited to " +
+ MAX_KIND + " stream kinds. Value = " + k);
+ }
+ if (stripeId < 0 || stripeId > MAX_STRIPE){
+ throw new IllegalArgumentException("ORC encryption is limited to " +
+ MAX_STRIPE + " stripes. Value = " + stripeId);
+ }
+ // the rest of the iv is used for counting within the stream
+ if (iv.length - (COLUMN_ID_LENGTH + KIND_LENGTH + STRIPE_ID_LENGTH) < MIN_COUNT_BYTES) {
+ throw new IllegalArgumentException("Not enough space in the iv for the count");
+ }
+ iv[0] = (byte)(columnId >> 16);
+ iv[1] = (byte)(columnId >> 8);
+ iv[2] = (byte)columnId;
+ iv[COLUMN_ID_LENGTH] = (byte)(k >> 8);
+ iv[COLUMN_ID_LENGTH+1] = (byte)(k);
+ iv[COLUMN_ID_LENGTH+KIND_LENGTH] = (byte)(stripeId >> 16);
+ iv[COLUMN_ID_LENGTH+KIND_LENGTH+1] = (byte)(stripeId >> 8);
+ iv[COLUMN_ID_LENGTH+KIND_LENGTH+2] = (byte)stripeId;
+ return iv;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
index 94e9232..06f439e 100644
--- a/java/core/src/java/org/apache/orc/impl/InStream.java
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,17 +20,22 @@ package org.apache.orc.impl;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.Key;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.io.DiskRange;
import com.google.protobuf.CodedInputStream;
+import javax.crypto.Cipher;
+import javax.crypto.ShortBufferException;
+import javax.crypto.spec.IvParameterSpec;
+
public abstract class InStream extends InputStream {
private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
@@ -55,66 +60,101 @@ public abstract class InStream extends InputStream {
@Override
public abstract void close();
+ static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
+ int result = 0;
+ DiskRangeList range = list;
+ while (range != null && range != current) {
+ result += 1;
+ range = range.next;
+ }
+ return result;
+ }
+
+ /**
+ * Implements a stream over an uncompressed stream.
+ */
public static class UncompressedStream extends InStream {
- private List<DiskRange> bytes;
+ private DiskRangeList bytes;
private long length;
protected long currentOffset;
- private ByteBuffer range;
- private int currentRange;
+ protected ByteBuffer decrypted;
+ protected DiskRangeList currentRange;
+
+ /**
+ * Create the stream without calling reset on it.
+ * This is used for the subclass that needs to do more setup.
+ * @param name name of the stream
+ * @param length the number of bytes for the stream
+ */
+ public UncompressedStream(String name, long length) {
+ super(name, length);
+ }
- public UncompressedStream(String name, List<DiskRange> input, long length) {
+ public UncompressedStream(String name,
+ DiskRangeList input,
+ long length) {
super(name, length);
reset(input, length);
}
- protected void reset(List<DiskRange> input, long length) {
+ protected void reset(DiskRangeList input, long length) {
this.bytes = input;
this.length = length;
- currentRange = 0;
- currentOffset = 0;
- range = null;
+ currentOffset = input == null ? 0 : input.getOffset();
+ setCurrent(input, true);
}
@Override
public int read() {
- if (range == null || range.remaining() == 0) {
+ if (decrypted == null || decrypted.remaining() == 0) {
if (currentOffset == length) {
return -1;
}
- seek(currentOffset);
+ setCurrent(currentRange.next, false);
}
currentOffset += 1;
- return 0xff & range.get();
+ return 0xff & decrypted.get();
+ }
+
+ protected void setCurrent(DiskRangeList newRange, boolean isJump) {
+ currentRange = newRange;
+ if (newRange != null) {
+ decrypted = newRange.getData().slice();
+ // Move the position in the ByteBuffer to match the currentOffset,
+ // which is relative to the stream.
+ decrypted.position((int) (currentOffset - newRange.getOffset()));
+ }
}
@Override
public int read(byte[] data, int offset, int length) {
- if (range == null || range.remaining() == 0) {
+ if (decrypted == null || decrypted.remaining() == 0) {
if (currentOffset == this.length) {
return -1;
}
- seek(currentOffset);
+ setCurrent(currentRange.next, false);
}
- int actualLength = Math.min(length, range.remaining());
- range.get(data, offset, actualLength);
+ int actualLength = Math.min(length, decrypted.remaining());
+ decrypted.get(data, offset, actualLength);
currentOffset += actualLength;
return actualLength;
}
@Override
public int available() {
- if (range != null && range.remaining() > 0) {
- return range.remaining();
+ if (decrypted != null && decrypted.remaining() > 0) {
+ return decrypted.remaining();
}
return (int) (length - currentOffset);
}
@Override
public void close() {
- currentRange = bytes.size();
+ currentRange = null;
currentOffset = length;
// explicit de-ref of bytes[]
- bytes.clear();
+ decrypted = null;
+ bytes = null;
}
@Override
@@ -122,45 +162,37 @@ public abstract class InStream extends InputStream {
seek(index.getNext());
}
- public void seek(long desired) {
- if (desired == 0 && bytes.isEmpty()) {
+ public void seek(long desired) throws IOException {
+ if (desired == 0 && bytes == null) {
return;
}
- int i = 0;
- for (DiskRange curRange : bytes) {
- if (curRange.getOffset() <= desired &&
- (desired - curRange.getOffset()) < curRange.getLength()) {
- currentOffset = desired;
- currentRange = i;
- this.range = curRange.getData().duplicate();
- int pos = range.position();
- pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
- this.range.position(pos);
- return;
- }
- ++i;
- }
- // if they are seeking to the precise end, go ahead and let them go there
- int segments = bytes.size();
- if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+ // If we are seeking inside of the current range, just reposition.
+ if (currentRange != null && desired >= currentRange.getOffset() &&
+ desired < currentRange.getEnd()) {
+ decrypted.position((int) (desired - currentRange.getOffset()));
currentOffset = desired;
- currentRange = segments - 1;
- DiskRange curRange = bytes.get(currentRange);
- this.range = curRange.getData().duplicate();
- int pos = range.position();
- pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
- this.range.position(pos);
- return;
+ } else {
+ for (DiskRangeList curRange = bytes; curRange != null;
+ curRange = curRange.next) {
+ if (curRange.getOffset() <= desired &&
+ (curRange.next == null ? desired <= curRange.getEnd() :
+ desired < curRange.getEnd())) {
+ currentOffset = desired;
+ setCurrent(curRange, true);
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Seek in " + name + " to " +
+ desired + " is outside of the data");
}
- throw new IllegalArgumentException("Seek in " + name + " to " +
- desired + " is outside of the data");
}
@Override
public String toString() {
return "uncompressed stream " + name + " position: " + currentOffset +
- " length: " + length + " range: " + currentRange +
- " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
+ " length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
+ " offset: " + (decrypted == null ? 0 : decrypted.position()) +
+ " limit: " + (decrypted == null ? 0 : decrypted.limit());
}
}
@@ -173,33 +205,208 @@ public abstract class InStream extends InputStream {
}
}
+ /**
+ * Manage the state of the decryption, including the ability to seek.
+ */
+ static class EncryptionState {
+ private final String name;
+ private final EncryptionAlgorithm algorithm;
+ private final Key key;
+ private final byte[] iv;
+ private final Cipher cipher;
+ private ByteBuffer decrypted;
+
+ EncryptionState(String name, StreamOptions options) {
+ this.name = name;
+ algorithm = options.algorithm;
+ key = options.key;
+ iv = options.iv;
+ cipher = algorithm.createCipher();
+ }
+
+ /**
+ * We are seeking to a new range, so update the cipher to change the IV
+ * to match. This code assumes that we only support encryption in CTR mode.
+ * @param offset where we are seeking to in the stream
+ */
+ void changeIv(long offset) {
+ int blockSize = cipher.getBlockSize();
+ long encryptionBlocks = offset / blockSize;
+ long extra = offset % blockSize;
+ byte[] advancedIv;
+ if (encryptionBlocks == 0) {
+ advancedIv = iv;
+ } else {
+ // Add the encryption blocks into the initial iv, to compensate for
+ // skipping over decrypting those bytes.
+ advancedIv = new byte[iv.length];
+ System.arraycopy(iv, 0, advancedIv, 0, iv.length);
+ int posn = iv.length - 1;
+ while (encryptionBlocks > 0) {
+ long sum = (advancedIv[posn] & 0xff) + encryptionBlocks;
+ advancedIv[posn--] = (byte) sum;
+ encryptionBlocks = sum / 0x100;
+ }
+ }
+ try {
+ cipher.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(advancedIv));
+ // If the range starts at an offset that doesn't match the encryption
+ // block, we need to advance some bytes within an encryption block.
+ if (extra > 0) {
+ byte[] wasted = new byte[(int) extra];
+ cipher.update(wasted, 0, wasted.length, wasted, 0);
+ }
+ } catch (InvalidKeyException e) {
+ throw new IllegalArgumentException("Invalid key on " + name, e);
+ } catch (InvalidAlgorithmParameterException e) {
+ throw new IllegalArgumentException("Invalid iv on " + name, e);
+ } catch (ShortBufferException e) {
+ throw new IllegalArgumentException("Short buffer in " + name, e);
+ }
+ }
+
+ /**
+ * Decrypt the given range into the decrypted buffer. It is assumed that
+ * the cipher is correctly initialized by changeIv before this is called.
+ * @param newRange the range to decrypte
+ * @return a reused ByteBuffer, which is used by each call to decrypt
+ */
+ ByteBuffer decrypt(DiskRangeList newRange) {
+ final long offset = newRange.getOffset();
+ final int length = newRange.getLength();
+ if (decrypted == null || decrypted.capacity() < length) {
+ decrypted = ByteBuffer.allocate(length);
+ } else {
+ decrypted.clear();
+ }
+ ByteBuffer encrypted = newRange.getData().duplicate();
+ try {
+ int output = cipher.update(encrypted, decrypted);
+ if (output != length) {
+ throw new IllegalArgumentException("Problem decrypting " + name +
+ " at " + offset);
+ }
+ } catch (ShortBufferException e) {
+ throw new IllegalArgumentException("Problem decrypting " + name +
+ " at " + offset, e);
+ }
+ decrypted.flip();
+ return decrypted;
+ }
+
+ void close() {
+ decrypted = null;
+ }
+ }
+
+ /**
+ * Implements a stream over an encrypted, but uncompressed stream.
+ */
+ public static class EncryptedStream extends UncompressedStream {
+ private final EncryptionState encrypt;
+
+ public EncryptedStream(String name, DiskRangeList input, long length,
+ StreamOptions options) {
+ super(name, length);
+ encrypt = new EncryptionState(name, options);
+ reset(input, length);
+ }
+
+ @Override
+ protected void setCurrent(DiskRangeList newRange, boolean isJump) {
+ currentRange = newRange;
+ if (newRange != null) {
+ if (isJump) {
+ encrypt.changeIv(newRange.getOffset());
+ }
+ decrypted = encrypt.decrypt(newRange);
+ decrypted.position((int) (currentOffset - newRange.getOffset()));
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ encrypt.close();
+ }
+
+ @Override
+ public String toString() {
+ return "encrypted " + super.toString();
+ }
+ }
+
private static class CompressedStream extends InStream {
- private final List<DiskRange> bytes;
+ private DiskRangeList bytes;
private final int bufferSize;
private ByteBuffer uncompressed;
private final CompressionCodec codec;
- private ByteBuffer compressed;
- private long currentOffset;
- private int currentRange;
+ protected ByteBuffer compressed;
+ protected long currentOffset;
+ protected DiskRangeList currentRange;
private boolean isUncompressedOriginal;
- public CompressedStream(String name, List<DiskRange> input, long length,
- CompressionCodec codec, int bufferSize) {
+ /**
+ * Create the stream without resetting the input stream.
+ * This is used in subclasses so they can finish initializing before
+ * reset is called.
+ * @param name the name of the stream
+ * @param length the total number of bytes in the stream
+ * @param options the options used to read the stream
+ */
+ public CompressedStream(String name,
+ long length,
+ StreamOptions options) {
super(name, length);
- this.bytes = input;
- this.codec = codec;
- this.bufferSize = bufferSize;
- currentOffset = 0;
- currentRange = 0;
+ this.codec = options.codec;
+ this.bufferSize = options.bufferSize;
+ }
+
+ /**
+ * Create the stream and initialize the input for the stream.
+ * @param name the name of the stream
+ * @param input the input data
+ * @param length the total length of the stream
+ * @param options the options to read the data with
+ */
+ public CompressedStream(String name,
+ DiskRangeList input,
+ long length,
+ StreamOptions options) {
+ super(name, length);
+ this.codec = options.codec;
+ this.bufferSize = options.bufferSize;
+ reset(input, length);
+ }
+
+ /**
+ * Reset the input to a new set of data.
+ * @param input the input data
+ * @param length the number of bytes in the stream
+ */
+ void reset(DiskRangeList input, long length) {
+ bytes = input;
+ this.length = length;
+ currentOffset = input == null ? 0 : input.getOffset();
+ setCurrent(input, true);
}
private void allocateForUncompressed(int size, boolean isDirect) {
uncompressed = allocateBuffer(size, isDirect);
}
+ protected void setCurrent(DiskRangeList newRange,
+ boolean isJump) {
+ currentRange = newRange;
+ if (newRange != null) {
+ compressed = newRange.getData().slice();
+ compressed.position((int) (currentOffset - newRange.getOffset()));
+ }
+ }
+
private void readHeader() throws IOException {
if (compressed == null || compressed.remaining() <= 0) {
- seek(currentOffset);
+ setCurrent(currentRange.next, false);
}
if (compressed.remaining() > OutStream.HEADER_SIZE) {
int b0 = compressed.get() & 0xff;
@@ -277,9 +484,9 @@ public abstract class InStream extends InputStream {
public void close() {
uncompressed = null;
compressed = null;
- currentRange = bytes.size();
+ currentRange = null;
currentOffset = length;
- bytes.clear();
+ bytes = null;
}
@Override
@@ -299,6 +506,7 @@ public abstract class InStream extends InputStream {
/* slices a read only contiguous buffer of chunkLength */
private ByteBuffer slice(int chunkLength) throws IOException {
int len = chunkLength;
+ final DiskRangeList oldRange = currentRange;
final long oldOffset = currentOffset;
ByteBuffer slice;
if (compressed.remaining() >= len) {
@@ -308,7 +516,7 @@ public abstract class InStream extends InputStream {
currentOffset += len;
compressed.position(compressed.position() + len);
return slice;
- } else if (currentRange >= (bytes.size() - 1)) {
+ } else if (currentRange.next == null) {
// nothing has been modified yet
throw new IOException("EOF in " + this + " while trying to read " +
chunkLength + " bytes");
@@ -326,21 +534,19 @@ public abstract class InStream extends InputStream {
currentOffset += compressed.remaining();
len -= compressed.remaining();
copy.put(compressed);
- ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
- while (len > 0 && iter.hasNext()) {
- ++currentRange;
+ while (currentRange.next != null) {
+ setCurrent(currentRange.next, false);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
}
- DiskRange range = iter.next();
- compressed = range.getData().duplicate();
if (compressed.remaining() >= len) {
slice = compressed.slice();
slice.limit(len);
copy.put(slice);
currentOffset += len;
compressed.position(compressed.position() + len);
+ copy.flip();
return copy;
}
currentOffset += compressed.remaining();
@@ -349,37 +555,24 @@ public abstract class InStream extends InputStream {
}
// restore offsets for exception clarity
- seek(oldOffset);
+ currentOffset = oldOffset;
+ setCurrent(oldRange, true);
throw new IOException("EOF in " + this + " while trying to read " +
chunkLength + " bytes");
}
- private void seek(long desired) throws IOException {
- if (desired == 0 && bytes.isEmpty()) {
+ void seek(long desired) throws IOException {
+ if (desired == 0 && bytes == null) {
return;
}
- int i = 0;
- for (DiskRange range : bytes) {
- if (range.getOffset() <= desired && desired < range.getEnd()) {
- currentRange = i;
- compressed = range.getData().duplicate();
- int pos = compressed.position();
- pos += (int)(desired - range.getOffset());
- compressed.position(pos);
+ for (DiskRangeList range = bytes; range != null; range = range.next) {
+ if (range.getOffset() <= desired &&
+ (range.next == null ? desired <= range.getEnd() :
+ desired < range.getEnd())) {
currentOffset = desired;
+ setCurrent(range, true);
return;
}
- ++i;
- }
- // if they are seeking to the precise end, go ahead and let them go there
- int segments = bytes.size();
- if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
- DiskRange range = bytes.get(segments - 1);
- currentRange = segments - 1;
- compressed = range.getData().duplicate();
- compressed.position(compressed.limit());
- currentOffset = desired;
- return;
}
throw new IOException("Seek outside of data in " + this + " to " + desired);
}
@@ -387,12 +580,16 @@ public abstract class InStream extends InputStream {
private String rangeString() {
StringBuilder builder = new StringBuilder();
int i = 0;
- for (DiskRange range : bytes) {
+ for (DiskRangeList range = bytes; range != null; range = range.next){
if (i != 0) {
builder.append("; ");
}
- builder.append(" range " + i + " = " + range.getOffset()
- + " to " + (range.getEnd() - range.getOffset()));
+ builder.append(" range ");
+ builder.append(i);
+ builder.append(" = ");
+ builder.append(range.getOffset());
+ builder.append(" to ");
+ builder.append(range.getEnd());
++i;
}
return builder.toString();
@@ -401,8 +598,9 @@ public abstract class InStream extends InputStream {
@Override
public String toString() {
return "compressed stream " + name + " position: " + currentOffset +
- " length: " + length + " range: " + currentRange +
- " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
+ " length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
+ " offset: " + (compressed == null ? 0 : compressed.position()) +
+ " limit: " + (compressed == null ? 0 : compressed.limit()) +
rangeString() +
(uncompressed == null ? "" :
" uncompressed: " + uncompressed.position() + " to " +
@@ -410,33 +608,116 @@ public abstract class InStream extends InputStream {
}
}
+ private static class EncryptedCompressedStream extends CompressedStream {
+ private final EncryptionState encrypt;
+
+ public EncryptedCompressedStream(String name,
+ DiskRangeList input,
+ long length,
+ StreamOptions options) {
+ super(name, length, options);
+ encrypt = new EncryptionState(name, options);
+ reset(input, length);
+ }
+
+ @Override
+ protected void setCurrent(DiskRangeList newRange, boolean isJump) {
+ currentRange = newRange;
+ if (newRange != null) {
+ if (isJump) {
+ encrypt.changeIv(newRange.getOffset());
+ }
+ compressed = encrypt.decrypt(newRange);
+ compressed.position((int) (currentOffset - newRange.getOffset()));
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ encrypt.close();
+ }
+
+ @Override
+ public String toString() {
+ return "encrypted " + super.toString();
+ }
+ }
+
public abstract void seek(PositionProvider index) throws IOException;
+ public static class StreamOptions implements Cloneable {
+ private CompressionCodec codec;
+ private int bufferSize;
+ private EncryptionAlgorithm algorithm;
+ private Key key;
+ private byte[] iv;
+
+ public StreamOptions withCodec(CompressionCodec value) {
+ this.codec = value;
+ return this;
+ }
+
+ public StreamOptions withBufferSize(int value) {
+ bufferSize = value;
+ return this;
+ }
+
+ public StreamOptions withEncryption(EncryptionAlgorithm algorithm,
+ Key key,
+ byte[] iv) {
+ this.algorithm = algorithm;
+ this.key = key;
+ this.iv = iv;
+ return this;
+ }
+
+ public CompressionCodec getCodec() {
+ return codec;
+ }
+
+ @Override
+ public StreamOptions clone() {
+ try {
+ StreamOptions clone = (StreamOptions) super.clone();
+ if (clone.codec != null) {
+ // Make sure we don't share the same codec between two readers.
+ clone.codec = OrcCodecPool.getCodec(codec.getKind());
+ }
+ return clone;
+ } catch (CloneNotSupportedException e) {
+ throw new UnsupportedOperationException("uncloneable", e);
+ }
+ }
+ }
+
+ public static StreamOptions options() {
+ return new StreamOptions();
+ }
+
/**
- * Create an input stream from a list of buffers.
- * @param streamName the name of the stream
- * @param buffers the list of ranges of bytes for the stream
- * @param offsets a list of offsets (the same length as input) that must
- * contain the first offset of the each set of bytes in input
+ * Create an input stream from a list of disk ranges with data.
+ * @param name the name of the stream
+ * @param input the list of ranges of bytes for the stream; from disk or cache
* @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
+ * @param options the options to read with
* @return an input stream
- * @throws IOException
*/
- //@VisibleForTesting
- @Deprecated
- public static InStream create(String streamName,
- ByteBuffer[] buffers,
- long[] offsets,
+ public static InStream create(String name,
+ DiskRangeList input,
long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
- for (int i = 0; i < buffers.length; ++i) {
- input.add(new BufferChunk(buffers[i], offsets[i]));
+ StreamOptions options) {
+ if (options == null || options.codec == null) {
+ if (options == null || options.key == null) {
+ return new UncompressedStream(name, input, length);
+ } else {
+ return new EncryptedStream(name, input, length, options);
+ }
+ } else if (options.key == null) {
+ return new CompressedStream(name, input, length, options);
+ } else {
+ return new EncryptedCompressedStream(name, input, length, options);
}
- return create(streamName, input, length, codec, bufferSize);
}
/**
@@ -444,41 +725,22 @@ public abstract class InStream extends InputStream {
* @param name the name of the stream
* @param input the list of ranges of bytes for the stream; from disk or cache
* @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
* @return an input stream
- * @throws IOException
*/
public static InStream create(String name,
- List<DiskRange> input,
- long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- if (codec == null) {
- return new UncompressedStream(name, input, length);
- } else {
- return new CompressedStream(name, input, length, codec, bufferSize);
- }
+ DiskRangeList input,
+ long length) throws IOException {
+ return create(name, input, length, null);
}
/**
- * Creates coded input stream (used for protobuf message parsing) with higher message size limit.
+ * Creates coded input stream (used for protobuf message parsing) with higher
+ * message size limit.
*
- * @param name the name of the stream
- * @param input the list of ranges of bytes for the stream; from disk or cache
- * @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
+ * @param inStream the stream to wrap.
* @return coded input stream
- * @throws IOException
*/
- public static CodedInputStream createCodedInputStream(
- String name,
- List<DiskRange> input,
- long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- InStream inStream = create(name, input, length, codec, bufferSize);
+ public static CodedInputStream createCodedInputStream(InStream inStream) {
CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream);
codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT);
return codedInputStream;
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/OrcTail.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcTail.java b/java/core/src/java/org/apache/orc/impl/OrcTail.java
index 3c78874..9e8a5f2 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcTail.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcTail.java
@@ -109,7 +109,8 @@ public final class OrcTail {
CompressionCodec codec = OrcCodecPool.getCodec(getCompressionKind());
try {
metadata = extractMetadata(serializedTail, 0,
- (int) fileTail.getPostscript().getMetadataLength(), codec, getCompressionBufferSize());
+ (int) fileTail.getPostscript().getMetadataLength(),
+ InStream.options().withCodec(codec).withBufferSize(getCompressionBufferSize()));
} finally {
OrcCodecPool.returnCodec(getCompressionKind(), codec);
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java
index d6302d5..435f43c 100644
--- a/java/core/src/java/org/apache/orc/impl/OutStream.java
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,13 +18,26 @@
package org.apache.orc.impl;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.writer.StreamOptions;
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.ShortBufferException;
+import javax.crypto.spec.IvParameterSpec;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.Key;
+/**
+ * The output stream for writing to ORC files.
+ * It handles both compression and encryption.
+ */
public class OutStream extends PositionedOutputStream {
-
public static final int HEADER_SIZE = 3;
private final String name;
private final PhysicalWriter.OutputReceiver receiver;
@@ -55,19 +68,89 @@ public class OutStream extends PositionedOutputStream {
private final CompressionCodec codec;
private long compressedBytes = 0;
private long uncompressedBytes = 0;
+ private final Cipher cipher;
+ private final Key key;
public OutStream(String name,
int bufferSize,
CompressionCodec codec,
- PhysicalWriter.OutputReceiver receiver) throws IOException {
+ PhysicalWriter.OutputReceiver receiver) {
+ this(name, new StreamOptions(bufferSize).withCodec(codec), receiver);
+ }
+
+ public OutStream(String name,
+ StreamOptions options,
+ PhysicalWriter.OutputReceiver receiver) {
this.name = name;
- this.bufferSize = bufferSize;
- this.codec = codec;
+ this.bufferSize = options.getBufferSize();
+ this.codec = options.getCodec();
this.receiver = receiver;
+ if (options.isEncrypted()) {
+ this.cipher = options.getAlgorithm().createCipher();
+ this.key = options.getKey();
+ changeIv(options.getIv());
+ } else {
+ this.cipher = null;
+ this.key = null;
+ }
}
- public void clear() throws IOException {
- flush();
+ /**
+ * Change the current Initialization Vector (IV) for the encryption.
+ */
+ void changeIv(byte[] newIv) {
+ try {
+ cipher.init(Cipher.ENCRYPT_MODE, key, new IvParameterSpec(newIv));
+ } catch (InvalidKeyException e) {
+ throw new IllegalStateException("ORC bad encryption key for " +
+ toString(), e);
+ } catch (InvalidAlgorithmParameterException e) {
+ throw new IllegalStateException("ORC bad encryption parameter for " +
+ toString(), e);
+ }
+ }
+
+ /**
+ * When a buffer is done, we send it to the receiver to store.
+ * If we are encrypting, encrypt the buffer before we pass it on.
+ * @param buffer the buffer to store
+ */
+ void outputBuffer(ByteBuffer buffer) throws IOException {
+ if (cipher != null) {
+ ByteBuffer output = buffer.duplicate();
+ int len = buffer.remaining();
+ try {
+ int encrypted = cipher.update(buffer, output);
+ output.flip();
+ receiver.output(output);
+ if (encrypted != len) {
+ throw new IllegalArgumentException("Encryption of incomplete buffer "
+ + len + " -> " + encrypted + " in " + toString());
+ }
+ } catch (ShortBufferException e) {
+ throw new IOException("Short buffer in encryption in " + toString(), e);
+ }
+ } else {
+ receiver.output(buffer);
+ }
+ }
+
+ /**
+ * Ensure that the cipher didn't save any data.
+ * The next call should be to changeIv to restart the encryption on a new IV.
+ */
+ void finishEncryption() {
+ try {
+ byte[] finalBytes = cipher.doFinal();
+ if (finalBytes != null && finalBytes.length != 0) {
+ throw new IllegalStateException("We shouldn't have remaining bytes " +
+ toString());
+ }
+ } catch (IllegalBlockSizeException e) {
+ throw new IllegalArgumentException("Bad block size", e);
+ } catch (BadPaddingException e) {
+ throw new IllegalArgumentException("Bad padding", e);
+ }
}
/**
@@ -90,7 +173,7 @@ public class OutStream extends PositionedOutputStream {
buffer.put(position + 2, (byte) (val >> 15));
}
- private void getNewInputBuffer() throws IOException {
+ private void getNewInputBuffer() {
if (codec == null) {
current = ByteBuffer.allocate(bufferSize);
} else {
@@ -117,11 +200,11 @@ public class OutStream extends PositionedOutputStream {
/**
* Allocate a new output buffer if we are compressing.
*/
- private ByteBuffer getNewOutputBuffer() throws IOException {
+ private ByteBuffer getNewOutputBuffer() {
return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
}
- private void flip() throws IOException {
+ private void flip() {
current.limit(current.position());
current.position(codec == null ? 0 : HEADER_SIZE);
}
@@ -165,7 +248,7 @@ public class OutStream extends PositionedOutputStream {
}
flip();
if (codec == null) {
- receiver.output(current);
+ outputBuffer(current);
getNewInputBuffer();
} else {
if (compressed == null) {
@@ -190,7 +273,7 @@ public class OutStream extends PositionedOutputStream {
// if we have less than the next header left, spill it.
if (compressed.remaining() < HEADER_SIZE) {
compressed.flip();
- receiver.output(compressed);
+ outputBuffer(compressed);
compressed = overflow;
overflow = null;
}
@@ -203,7 +286,7 @@ public class OutStream extends PositionedOutputStream {
if (sizePosn != 0) {
compressed.position(sizePosn);
compressed.flip();
- receiver.output(compressed);
+ outputBuffer(compressed);
compressed = null;
// if we have an overflow, clear it and make it the new compress
// buffer
@@ -223,14 +306,14 @@ public class OutStream extends PositionedOutputStream {
current.position(0);
// update the header with the current length
writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
- receiver.output(current);
+ outputBuffer(current);
getNewInputBuffer();
}
}
}
@Override
- public void getPosition(PositionRecorder recorder) throws IOException {
+ public void getPosition(PositionRecorder recorder) {
if (codec == null) {
recorder.addPosition(uncompressedBytes);
} else {
@@ -244,7 +327,10 @@ public class OutStream extends PositionedOutputStream {
spill();
if (compressed != null && compressed.position() != 0) {
compressed.flip();
- receiver.output(compressed);
+ outputBuffer(compressed);
+ }
+ if (cipher != null) {
+ finishEncryption();
}
compressed = null;
uncompressedBytes = 0;
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index bba580f..3919988 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.io.Text;
import org.apache.orc.OrcProto;
@@ -275,31 +274,6 @@ public class ReaderImpl implements Reader {
}
/**
- * Ensure this is an ORC file to prevent users from trying to read text
- * files or RC files as ORC files.
- * @param psLen the postscript length
- * @param buffer the tail of the file
- */
- protected static void ensureOrcFooter(ByteBuffer buffer, int psLen) throws IOException {
- int magicLength = OrcFile.MAGIC.length();
- int fullLength = magicLength + 1;
- if (psLen < fullLength || buffer.remaining() < fullLength) {
- throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen);
- }
-
- int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength;
- byte[] array = buffer.array();
- // now look for the magic string at the end of the postscript.
- if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) {
- // if it isn't there, this may be 0.11.0 version of the ORC file.
- // Read the first 3 bytes from the buffer to check for the header
- if (!Text.decode(buffer.array(), 0, magicLength).equals(OrcFile.MAGIC)) {
- throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen);
- }
- }
- }
-
- /**
* Build a version string out of an array.
* @param version the version number as a list
* @return the human readable form of the version string
@@ -405,26 +379,20 @@ public class ReaderImpl implements Reader {
return OrcFile.WriterVersion.FUTURE;
}
- static List<DiskRange> singleton(DiskRange item) {
- List<DiskRange> result = new ArrayList<>();
- result.add(item);
- return result;
- }
-
private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
- int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
+ int footerSize, InStream.StreamOptions options) throws IOException {
bb.position(footerAbsPos);
bb.limit(footerAbsPos + footerSize);
- return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer",
- singleton(new BufferChunk(bb, 0)), footerSize, codec, bufferSize));
+ return OrcProto.Footer.parseFrom(InStream.createCodedInputStream(
+ InStream.create("footer", new BufferChunk(bb, 0), footerSize, options)));
}
public static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
- int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
+ int metadataSize, InStream.StreamOptions options) throws IOException {
bb.position(metadataAbsPos);
bb.limit(metadataAbsPos + metadataSize);
- return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata",
- singleton(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize));
+ return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream(
+ InStream.create("metadata", new BufferChunk(bb, 0), metadataSize, options)));
}
private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
@@ -450,41 +418,6 @@ public class ReaderImpl implements Reader {
return ps;
}
- public static OrcTail extractFileTail(ByteBuffer buffer)
- throws IOException {
- return extractFileTail(buffer, -1, -1);
- }
-
- public static OrcTail extractFileTail(ByteBuffer buffer, long fileLength, long modificationTime)
- throws IOException {
- int readSize = buffer.limit();
- int psLen = buffer.get(readSize - 1) & 0xff;
- int psOffset = readSize - 1 - psLen;
- ensureOrcFooter(buffer, psLen);
- byte[] psBuffer = new byte[psLen];
- System.arraycopy(buffer.array(), psOffset, psBuffer, 0, psLen);
- OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(psBuffer);
- int footerSize = (int) ps.getFooterLength();
- CompressionKind kind = CompressionKind.valueOf(ps.getCompression().name());
- OrcProto.FileTail.Builder fileTailBuilder;
- CompressionCodec codec = OrcCodecPool.getCodec(kind);
- try {
- OrcProto.Footer footer = extractFooter(buffer,
- (int) (buffer.position() + ps.getMetadataLength()),
- footerSize, codec, (int) ps.getCompressionBlockSize());
- fileTailBuilder = OrcProto.FileTail.newBuilder()
- .setPostscriptLength(psLen)
- .setPostscript(ps)
- .setFooter(footer)
- .setFileLength(fileLength);
- } finally {
- OrcCodecPool.returnCodec(kind, codec);
- }
- // clear does not clear the contents but sets position to 0 and limit = capacity
- buffer.clear();
- return new OrcTail(fileTailBuilder.build(), buffer.slice(), modificationTime);
- }
-
/**
* Build a virtual OrcTail for empty files.
* @return a new OrcTail
@@ -599,7 +532,8 @@ public class ReaderImpl implements Reader {
OrcProto.Footer footer;
CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
try {
- footer = extractFooter(footerBuffer, 0, footerSize, codec, bufferSize);
+ footer = extractFooter(footerBuffer, 0, footerSize,
+ InStream.options().withCodec(codec).withBufferSize(bufferSize));
} finally {
OrcCodecPool.returnCodec(compressionKind, codec);
}
@@ -788,7 +722,8 @@ public class ReaderImpl implements Reader {
if (metadata == null) {
CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
try {
- metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize);
+ metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize,
+ InStream.options().withCodec(codec).withBufferSize(bufferSize));
} finally {
OrcCodecPool.returnCodec(compressionKind, codec);
}
@@ -803,10 +738,6 @@ public class ReaderImpl implements Reader {
return result;
}
- public List<OrcProto.UserMetadataItem> getOrcProtoUserMetadata() {
- return userMetadata;
- }
-
@Override
public List<Integer> getVersionList() {
return versionList;
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 8bb4d9a..a8e0be1 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -1160,6 +1160,8 @@ public class RecordReaderImpl implements RecordReader {
int bufferSize,
Map<StreamName, InStream> streams) throws IOException {
long streamOffset = 0;
+ InStream.StreamOptions options = InStream.options().withCodec(codec)
+ .withBufferSize(bufferSize);
for (OrcProto.Stream streamDesc : streamDescriptions) {
int column = streamDesc.getColumn();
if ((includeColumn != null &&
@@ -1169,11 +1171,11 @@ public class RecordReaderImpl implements RecordReader {
streamOffset += streamDesc.getLength();
continue;
}
- List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
+ DiskRangeList buffers = RecordReaderUtils.getStreamBuffers(
ranges, streamOffset, streamDesc.getLength());
StreamName name = new StreamName(column, streamDesc.getKind());
streams.put(name, InStream.create(name.toString(), buffers,
- streamDesc.getLength(), codec, bufferSize));
+ streamDesc.getLength(), options));
streamOffset += streamDesc.getLength();
}
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 705e768..a70c988 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -150,8 +150,7 @@ public class RecordReaderUtils {
private final FileSystem fs;
private final Path path;
private final boolean useZeroCopy;
- private CompressionCodec codec;
- private final int bufferSize;
+ private InStream.StreamOptions options = InStream.options();
private final int typeCount;
private CompressionKind compressionKind;
@@ -160,8 +159,8 @@ public class RecordReaderUtils {
this.path = properties.getPath();
this.useZeroCopy = properties.getZeroCopy();
this.compressionKind = properties.getCompression();
- this.codec = OrcCodecPool.getCodec(compressionKind);
- this.bufferSize = properties.getBufferSize();
+ options.withCodec(OrcCodecPool.getCodec(compressionKind))
+ .withBufferSize(properties.getBufferSize());
this.typeCount = properties.getTypeCount();
}
@@ -171,7 +170,7 @@ public class RecordReaderUtils {
if (useZeroCopy) {
// ZCR only uses codec for boolean checks.
pool = new ByteBufferAllocatorPool();
- zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
+ zcr = RecordReaderUtils.createZeroCopyShim(file, options.getCodec(), pool);
} else {
zcr = null;
}
@@ -228,9 +227,9 @@ public class RecordReaderUtils {
bb.position((int) (offset - range.getOffset()));
bb.limit((int) (bb.position() + stream.getLength()));
indexes[column] = OrcProto.RowIndex.parseFrom(
- InStream.createCodedInputStream("index",
- ReaderImpl.singleton(new BufferChunk(bb, 0)),
- stream.getLength(), codec, bufferSize));
+ InStream.createCodedInputStream(InStream.create("index",
+ new BufferChunk(bb, 0),
+ stream.getLength(), options)));
}
break;
case BLOOM_FILTER:
@@ -240,9 +239,9 @@ public class RecordReaderUtils {
bb.position((int) (offset - range.getOffset()));
bb.limit((int) (bb.position() + stream.getLength()));
bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom
- (InStream.createCodedInputStream("bloom_filter",
- ReaderImpl.singleton(new BufferChunk(bb, 0)),
- stream.getLength(), codec, bufferSize));
+ (InStream.createCodedInputStream(InStream.create(
+ "bloom_filter", new BufferChunk(bb, 0),
+ stream.getLength(), options)));
}
break;
default:
@@ -266,8 +265,8 @@ public class RecordReaderUtils {
ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
return OrcProto.StripeFooter.parseFrom(
- InStream.createCodedInputStream("footer", ReaderImpl.singleton(
- new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize));
+ InStream.createCodedInputStream(InStream.create("footer",
+ new BufferChunk(tailBuf, 0), tailLength, options)));
}
@Override
@@ -278,9 +277,9 @@ public class RecordReaderUtils {
@Override
public void close() throws IOException {
- if (codec != null) {
- OrcCodecPool.returnCodec(compressionKind, codec);
- codec = null;
+ if (options.getCodec() != null) {
+ OrcCodecPool.returnCodec(compressionKind, options.getCodec());
+ options.withCodec(null);
}
if (pool != null) {
pool.clear();
@@ -313,9 +312,9 @@ public class RecordReaderUtils {
}
try {
DefaultDataReader clone = (DefaultDataReader) super.clone();
- if (codec != null) {
+ if (options.getCodec() != null) {
// Make sure we don't share the same codec between two readers.
- clone.codec = OrcCodecPool.getCodec(clone.compressionKind);
+ clone.options = options.clone();
}
return clone;
} catch (CloneNotSupportedException e) {
@@ -325,7 +324,7 @@ public class RecordReaderUtils {
@Override
public CompressionCodec getCompressionCodec() {
- return codec;
+ return options.getCodec();
}
}
@@ -572,42 +571,49 @@ public class RecordReaderUtils {
}
- static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) {
+ static DiskRangeList getStreamBuffers(DiskRangeList range, long offset,
+ long length) {
// This assumes sorted ranges (as do many other parts of ORC code.
- ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
- if (length == 0) return buffers;
- long streamEnd = offset + length;
- boolean inRange = false;
- while (range != null) {
- if (!inRange) {
- if (range.getEnd() <= offset) {
- range = range.next;
- continue; // Skip until we are in range.
+ BufferChunkList result = new BufferChunkList();
+ if (length != 0) {
+ long streamEnd = offset + length;
+ boolean inRange = false;
+ while (range != null) {
+ if (!inRange) {
+ if (range.getEnd() <= offset) {
+ range = range.next;
+ continue; // Skip until we are in range.
+ }
+ inRange = true;
+ if (range.getOffset() < offset) {
+ // Partial first buffer, add a slice of it.
+ result.add((BufferChunk) range.sliceAndShift(offset,
+ Math.min(streamEnd, range.getEnd()), -offset));
+ if (range.getEnd() >= streamEnd)
+ break; // Partial first buffer is also partial last buffer.
+ range = range.next;
+ continue;
+ }
+ } else if (range.getOffset() >= streamEnd) {
+ break;
}
- inRange = true;
- if (range.getOffset() < offset) {
- // Partial first buffer, add a slice of it.
- buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset));
- if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer.
- range = range.next;
- continue;
+ if (range.getEnd() > streamEnd) {
+ // Partial last buffer (may also be the first buffer), add a slice of it.
+ result.add((BufferChunk) range.sliceAndShift(range.getOffset(),
+ streamEnd, -offset));
+ break;
}
- } else if (range.getOffset() >= streamEnd) {
- break;
- }
- if (range.getEnd() > streamEnd) {
- // Partial last buffer (may also be the first buffer), add a slice of it.
- buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset));
- break;
+ // Buffer that belongs entirely to one stream.
+ // TODO: ideally we would want to reuse the object and remove it from
+ // the list, but we cannot because bufferChunks is also used by
+ // clearStreams for zcr. Create a useless dup.
+ result.add((BufferChunk) range.sliceAndShift(range.getOffset(),
+ range.getEnd(), -offset));
+ if (range.getEnd() == streamEnd) break;
+ range = range.next;
}
- // Buffer that belongs entirely to one stream.
- // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
- // because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
- buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset));
- if (range.getEnd() == streamEnd) break;
- range = range.next;
}
- return buffers;
+ return result.get();
}
static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file,
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java b/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java
deleted file mode 100644
index da92c62..0000000
--- a/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.common.DiskRangeInfo;
-import org.apache.hadoop.hive.common.io.DiskRange;
-
-/**
- * An uncompressed stream whose underlying byte buffer can be set.
- */
-public class SettableUncompressedStream extends InStream.UncompressedStream {
-
- public SettableUncompressedStream(String name, List<DiskRange> input, long length) {
- super(name, input, length);
- setOffset(input);
- }
-
- public void setBuffers(DiskRangeInfo diskRangeInfo) {
- reset(diskRangeInfo.getDiskRanges(), diskRangeInfo.getTotalLength());
- setOffset(diskRangeInfo.getDiskRanges());
- }
-
- private void setOffset(List<DiskRange> list) {
- currentOffset = list.isEmpty() ? 0 : list.get(0).getOffset();
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
index 452f315..9269eb6 100644
--- a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
@@ -20,6 +20,7 @@ package org.apache.orc.impl;
import io.airlift.compress.snappy.SnappyCompressor;
import io.airlift.compress.snappy.SnappyDecompressor;
+import org.apache.orc.CompressionKind;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -32,7 +33,7 @@ public class SnappyCodec extends AircompressorCodec
HadoopShims.DirectDecompressor decompressShim = null;
SnappyCodec() {
- super(new SnappyCompressor(), new SnappyDecompressor());
+ super(CompressionKind.SNAPPY, new SnappyCompressor(), new SnappyDecompressor());
}
@Override
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index d6239f2..4c3c548 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -236,10 +236,10 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
case SNAPPY:
return new SnappyCodec();
case LZO:
- return new AircompressorCodec(new LzoCompressor(),
+ return new AircompressorCodec(kind, new LzoCompressor(),
new LzoDecompressor());
case LZ4:
- return new AircompressorCodec(new Lz4Compressor(),
+ return new AircompressorCodec(kind, new Lz4Compressor(),
new Lz4Decompressor());
default:
throw new IllegalArgumentException("Unknown compression codec: " +
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
index de1a6bf..19a3728 100644
--- a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
@@ -25,6 +25,7 @@ import java.util.zip.Deflater;
import java.util.zip.Inflater;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
private static final HadoopShims SHIMS = HadoopShimsFactory.get();
@@ -187,4 +188,9 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
decompressShim.end();
}
}
+
+ @Override
+ public CompressionKind getKind() {
+ return CompressionKind.ZLIB;
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java b/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
new file mode 100644
index 0000000..3d0c48a
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl.writer;
+
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
+
+import java.security.Key;
+
+/**
+ * The compression and encryption options for writing a stream.
+ */
+public class StreamOptions {
+ private CompressionCodec codec;
+ private final int bufferSize;
+ private EncryptionAlgorithm algorithm;
+ private Key key;
+ private byte[] iv;
+
+ /**
+ * An option object with the given buffer size set.
+ * @param bufferSize the size of the buffers.
+ */
+ public StreamOptions(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ /**
+ * Compress using the given codec.
+ * @param codec the codec to compress with
+ * @return this
+ */
+ public StreamOptions withCodec(CompressionCodec codec) {
+ this.codec = codec;
+ return this;
+ }
+
+ public StreamOptions withEncryption(EncryptionAlgorithm algorithm,
+ Key key,
+ byte[] iv) {
+ this.algorithm = algorithm;
+ this.key = key;
+ this.iv = iv;
+ return this;
+ }
+
+ public CompressionCodec getCodec() {
+ return codec;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public boolean isEncrypted() {
+ return key != null;
+ }
+
+ public Key getKey() {
+ return key;
+ }
+
+ public EncryptionAlgorithm getAlgorithm() {
+ return algorithm;
+ }
+
+ public byte[] getIv() {
+ return iv;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
index f7a2a5c..03c379e 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -48,8 +48,8 @@ public class TestBitFieldReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
BitFieldReader in = new BitFieldReader(InStream.create("test",
- new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
- codec, 500));
+ new BufferChunk(inBuf, 0), inBuf.remaining(),
+ InStream.options().withCodec(codec).withBufferSize(500)));
for(int i=0; i < COUNT; ++i) {
int x = in.next();
if (i < COUNT / 2) {
@@ -96,8 +96,8 @@ public class TestBitFieldReader {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- BitFieldReader in = new BitFieldReader(InStream.create("test", new ByteBuffer[]{inBuf},
- new long[]{0}, inBuf.remaining(), null, 100));
+ BitFieldReader in = new BitFieldReader(InStream.create("test",
+ new BufferChunk(inBuf, 0), inBuf.remaining()));
for(int i=0; i < COUNT; i += 5) {
int x = (int) in.next();
if (i < COUNT/2) {
@@ -133,8 +133,8 @@ public class TestBitFieldReader {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- BitFieldReader in = new BitFieldReader(InStream.create("test", new ByteBuffer[]{inBuf},
- new long[]{0}, inBuf.remaining(), null, 100));
+ BitFieldReader in = new BitFieldReader(InStream.create("test",
+ new BufferChunk(inBuf, 0), inBuf.remaining()));
in.seek(posn);
in.skip(10);
for(int r = 210; r < COUNT; ++r) {
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestBitPack.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitPack.java b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
index f2d3d64..3eba3e6 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitPack.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
@@ -108,8 +108,9 @@ public class TestBitPack {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
long[] buff = new long[SIZE];
- utils.readInts(buff, 0, SIZE, fixedWidth, InStream.create("test", new ByteBuffer[] { inBuf },
- new long[] { 0 }, inBuf.remaining(), null, SIZE));
+ utils.readInts(buff, 0, SIZE, fixedWidth,
+ InStream.create("test", new BufferChunk(inBuf,0),
+ inBuf.remaining()));
for (int i = 0; i < SIZE; i++) {
buff[i] = utils.zigzagDecode(buff[i]);
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java b/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java
new file mode 100644
index 0000000..203d3e7
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.OrcProto;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TestCryptoUtils {
+
+ @Test
+ public void testCreateStreamIv() throws Exception {
+ byte[] iv = CryptoUtils.createIvForStream(EncryptionAlgorithm.AES_128,
+ new StreamName(0x234567,
+ OrcProto.Stream.Kind.BLOOM_FILTER_UTF8), 0x123456);
+ assertEquals(16, iv.length);
+ assertEquals(0x23, iv[0]);
+ assertEquals(0x45, iv[1]);
+ assertEquals(0x67, iv[2]);
+ assertEquals(0x0, iv[3]);
+ assertEquals(0x8, iv[4]);
+ assertEquals(0x12, iv[5]);
+ assertEquals(0x34, iv[6]);
+ assertEquals(0x56, iv[7]);
+ }
+}