You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2018/07/13 21:28:49 UTC

[1/2] orc git commit: ORC-251: Extend InStream and OutStream to support encryption.

Repository: orc
Updated Branches:
  refs/heads/master f3dd9c159 -> edbb9673d


http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestInStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index a27c49a..fe7b53d 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,29 +18,35 @@
 
 package org.apache.orc.impl;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.fail;
-
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.security.Key;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.OrcProto;
 import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.writer.StreamOptions;
 import org.junit.Test;
 
+import javax.crypto.spec.SecretKeySpec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 public class TestInStream {
 
   public static class OutputCollector implements PhysicalWriter.OutputReceiver {
     public DynamicByteArray buffer = new DynamicByteArray();
 
     @Override
-    public void output(ByteBuffer buffer) throws IOException {
+    public void output(ByteBuffer buffer) {
       this.buffer.add(buffer.array(), buffer.arrayOffset() + buffer.position(),
           buffer.remaining());
     }
@@ -53,7 +59,7 @@ public class TestInStream {
 
   static class PositionCollector
       implements PositionProvider, PositionRecorder {
-    private List<Long> positions = new ArrayList<Long>();
+    private List<Long> positions = new ArrayList<>();
     private int index = 0;
 
     @Override
@@ -101,10 +107,10 @@ public class TestInStream {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
-        new long[]{0}, inBuf.remaining(), null, 100);
+    InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
+        inBuf.remaining());
     assertEquals("uncompressed stream test position: 0 length: 1024" +
-                 " range: 0 offset: 0 limit: 0",
+                 " range: 0 offset: 0 limit: 1024",
                  in.toString());
     for(int i=0; i < 1024; ++i) {
       int x = in.read();
@@ -117,6 +123,121 @@ public class TestInStream {
   }
 
   @Test
+  public void testEncrypted() throws Exception {
+    final long DATA_CONST = 0x1_0000_0003L;
+    final int ROW_COUNT = 1024;
+    OutputCollector collect = new OutputCollector();
+    EncryptionAlgorithm algorithm = EncryptionAlgorithm.AES_128;
+    byte[] rawKey = new byte[algorithm.keyLength()];
+    for(int i=0; i < rawKey.length; ++i) {
+      rawKey[i] = (byte) i;
+    }
+    Key decryptKey = new SecretKeySpec(rawKey, algorithm.getAlgorithm());
+    StreamName name = new StreamName(0, OrcProto.Stream.Kind.DATA);
+    byte[] iv = CryptoUtils.createIvForStream(algorithm, name, 0);
+    StreamOptions writerOptions = new StreamOptions(100)
+        .withEncryption(algorithm, decryptKey, iv);
+    OutStream out = new OutStream("test", writerOptions, collect);
+    PositionCollector[] positions = new PositionCollector[ROW_COUNT];
+    DataOutputStream outStream = new DataOutputStream(out);
+    for(int i=0; i < ROW_COUNT; ++i) {
+      positions[i] = new PositionCollector();
+      out.getPosition(positions[i]);
+      outStream.writeLong(i * DATA_CONST);
+    }
+    out.flush();
+    assertEquals(ROW_COUNT * 8, collect.buffer.size());
+
+    // Allocate the stream into three ranges. making sure that they don't fall
+    // on the 16 byte aes boundaries.
+    int[] rangeSizes = {1965, ROW_COUNT * 8 - 1965 - 15, 15};
+    int offset = 0;
+    BufferChunkList list = new BufferChunkList();
+    for(int size: rangeSizes) {
+      ByteBuffer buffer = ByteBuffer.allocate(size);
+      collect.buffer.setByteBuffer(buffer, offset, size);
+      buffer.flip();
+      list.add(new BufferChunk(buffer, offset));
+      offset += size;
+    }
+
+    InStream in = InStream.create("test", list.get(), collect.buffer.size(),
+        InStream.options().withEncryption(algorithm, decryptKey, iv));
+    assertEquals("encrypted uncompressed stream test position: 0 length: 8192" +
+            " range: 0 offset: 0 limit: 1965",
+        in.toString());
+    DataInputStream inputStream = new DataInputStream(in);
+    for(int i=0; i < ROW_COUNT; ++i) {
+      assertEquals("row " + i, i * DATA_CONST, inputStream.readLong());
+    }
+    for(int i=ROW_COUNT - 1; i >= 0; --i) {
+      in.seek(positions[i]);
+      assertEquals("row " + i, i * DATA_CONST, inputStream.readLong());
+    }
+  }
+
+  @Test
+  public void testCompressedEncrypted() throws Exception {
+    final long DATA_CONST = 0x1_0000_0003L;
+    final int ROW_COUNT = 1024;
+    OutputCollector collect = new OutputCollector();
+    EncryptionAlgorithm algorithm = EncryptionAlgorithm.AES_128;
+    byte[] rawKey = new byte[algorithm.keyLength()];
+    for(int i=0; i < rawKey.length; ++i) {
+      rawKey[i] = (byte) i;
+    }
+    Key decryptKey = new SecretKeySpec(rawKey, algorithm.getAlgorithm());
+    StreamName name = new StreamName(0, OrcProto.Stream.Kind.DATA);
+    byte[] iv = CryptoUtils.createIvForStream(algorithm, name, 0);
+    StreamOptions writerOptions = new StreamOptions(500)
+        .withCodec(new ZlibCodec())
+        .withEncryption(algorithm, decryptKey, iv);
+    OutStream out = new OutStream("test", writerOptions, collect);
+    PositionCollector[] positions = new PositionCollector[ROW_COUNT];
+    DataOutputStream outStream = new DataOutputStream(out);
+    for(int i=0; i < ROW_COUNT; ++i) {
+      positions[i] = new PositionCollector();
+      out.getPosition(positions[i]);
+      outStream.writeLong(i * DATA_CONST);
+    }
+    out.flush();
+    // currently 3957 bytes
+    int compressedSize = collect.buffer.size();
+
+    // Allocate the stream into three ranges. making sure that they don't fall
+    // on the 16 byte aes boundaries.
+    int[] rangeSizes = {1998, compressedSize - 1998 - 15, 15};
+    int offset = 0;
+    BufferChunkList list = new BufferChunkList();
+    for(int size: rangeSizes) {
+      ByteBuffer buffer = ByteBuffer.allocate(size);
+      collect.buffer.setByteBuffer(buffer, offset, size);
+      buffer.flip();
+      list.add(new BufferChunk(buffer, offset));
+      offset += size;
+    }
+
+    InStream in = InStream.create("test", list.get(), collect.buffer.size(),
+        InStream.options()
+            .withCodec(new ZlibCodec()).withBufferSize(500)
+            .withEncryption(algorithm, decryptKey, iv));
+    assertEquals("encrypted compressed stream test position: 0 length: " +
+            compressedSize + " range: 0 offset: 0 limit: 1998 range 0 = 0 to" +
+            " 1998;  range 1 = 1998 to " + (compressedSize - 15) +
+            ";  range 2 = " +
+            (compressedSize - 15) + " to " + compressedSize,
+        in.toString());
+    DataInputStream inputStream = new DataInputStream(in);
+    for(int i=0; i < ROW_COUNT; ++i) {
+      assertEquals("row " + i, i * DATA_CONST, inputStream.readLong());
+    }
+    for(int i=ROW_COUNT - 1; i >= 0; --i) {
+      in.seek(positions[i]);
+      assertEquals("row " + i, i * DATA_CONST, inputStream.readLong());
+    }
+  }
+
+  @Test
   public void testCompressed() throws Exception {
     OutputCollector collect = new OutputCollector();
     CompressionCodec codec = new ZlibCodec();
@@ -133,10 +254,11 @@ public class TestInStream {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
-        new long[]{0}, inBuf.remaining(), codec, 300);
+    InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
+        inBuf.remaining(),
+        InStream.options().withCodec(codec).withBufferSize(300));
     assertEquals("compressed stream test position: 0 length: 961 range: 0" +
-                 " offset: 0 limit: 0 range 0 = 0 to 961",
+                 " offset: 0 limit: 961 range 0 = 0 to 961",
                  in.toString());
     for(int i=0; i < 1024; ++i) {
       int x = in.read();
@@ -166,8 +288,9 @@ public class TestInStream {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
-        new long[]{0}, inBuf.remaining(), codec, 100);
+    InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
+        inBuf.remaining(),
+        InStream.options().withCodec(codec).withBufferSize(100));
     byte[] contents = new byte[1024];
     try {
       in.read(contents);
@@ -181,8 +304,9 @@ public class TestInStream {
     inBuf.put((byte) 32);
     inBuf.put((byte) 0);
     inBuf.flip();
-    in = InStream.create("test2", new ByteBuffer[]{inBuf}, new long[]{0},
-        inBuf.remaining(), codec, 300);
+    in = InStream.create("test2", new BufferChunk(inBuf, 0),
+        inBuf.remaining(),
+        InStream.options().withCodec(codec).withBufferSize(300));
     try {
       in.read();
       fail();
@@ -214,14 +338,19 @@ public class TestInStream {
     collect.buffer.setByteBuffer(inBuf[1], 483, 1625 - 483);
     collect.buffer.setByteBuffer(inBuf[2], 1625, 1674 - 1625);
 
-    for(int i=0; i < inBuf.length; ++i) {
-      inBuf[i].flip();
+    BufferChunkList buffers = new BufferChunkList();
+    int offset = 0;
+    for(ByteBuffer buffer: inBuf) {
+      buffer.flip();
+      buffers.add(new BufferChunk(buffer, offset));
+      offset += buffer.remaining();
     }
-    InStream in = InStream.create("test", inBuf,
-        new long[]{0,483, 1625}, 1674, codec, 400);
+    InStream.StreamOptions options = InStream.options()
+        .withCodec(codec).withBufferSize(400);
+    InStream in = InStream.create("test", buffers.get(), 1674, options);
     assertEquals("compressed stream test position: 0 length: 1674 range: 0" +
-                 " offset: 0 limit: 0 range 0 = 0 to 483;" +
-                 "  range 1 = 483 to 1142;  range 2 = 1625 to 49",
+                 " offset: 0 limit: 483 range 0 = 0 to 483;" +
+                 "  range 1 = 483 to 1625;  range 2 = 1625 to 1674",
                  in.toString());
     DataInputStream inStream = new DataInputStream(in);
     for(int i=0; i < 1024; ++i) {
@@ -234,8 +363,10 @@ public class TestInStream {
       assertEquals(i, inStream.readInt());
     }
 
-    in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]},
-        new long[]{483, 1625}, 1674, codec, 400);
+    buffers.clear();
+    buffers.add(new BufferChunk(inBuf[1], 483));
+    buffers.add(new BufferChunk(inBuf[2], 1625));
+    in = InStream.create("test", buffers.get(), 1674, options);
     inStream = new DataInputStream(in);
     positions[303].reset();
     in.seek(positions[303]);
@@ -243,8 +374,10 @@ public class TestInStream {
       assertEquals(i, inStream.readInt());
     }
 
-    in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]},
-        new long[]{0, 1625}, 1674, codec, 400);
+    buffers.clear();
+    buffers.add(new BufferChunk(inBuf[0], 0));
+    buffers.add(new BufferChunk(inBuf[2], 1625));
+    in = InStream.create("test", buffers.get(), 1674, options);
     inStream = new DataInputStream(in);
     positions[1001].reset();
     for(int i=0; i < 300; ++i) {
@@ -278,13 +411,16 @@ public class TestInStream {
     collect.buffer.setByteBuffer(inBuf[1], 1024, 2048);
     collect.buffer.setByteBuffer(inBuf[2], 3072, 1024);
 
-    for(int i=0; i < inBuf.length; ++i) {
-      inBuf[i].flip();
+    for(ByteBuffer buffer: inBuf) {
+      buffer.flip();
     }
-    InStream in = InStream.create("test", inBuf,
-        new long[]{0, 1024, 3072}, 4096, null, 400);
+    BufferChunkList buffers = new BufferChunkList();
+    buffers.add(new BufferChunk(inBuf[0], 0));
+    buffers.add(new BufferChunk(inBuf[1], 1024));
+    buffers.add(new BufferChunk(inBuf[2], 3072));
+    InStream in = InStream.create("test", buffers.get(), 4096);
     assertEquals("uncompressed stream test position: 0 length: 4096" +
-                 " range: 0 offset: 0 limit: 0",
+                 " range: 0 offset: 0 limit: 1024",
                  in.toString());
     DataInputStream inStream = new DataInputStream(in);
     for(int i=0; i < 1024; ++i) {
@@ -297,8 +433,10 @@ public class TestInStream {
       assertEquals(i, inStream.readInt());
     }
 
-    in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]},
-        new long[]{1024, 3072}, 4096, null, 400);
+    buffers.clear();
+    buffers.add(new BufferChunk(inBuf[1], 1024));
+    buffers.add(new BufferChunk(inBuf[2], 3072));
+    in = InStream.create("test", buffers.get(), 4096);
     inStream = new DataInputStream(in);
     positions[256].reset();
     in.seek(positions[256]);
@@ -306,8 +444,10 @@ public class TestInStream {
       assertEquals(i, inStream.readInt());
     }
 
-    in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]},
-        new long[]{0, 3072}, 4096, null, 400);
+    buffers.clear();
+    buffers.add(new BufferChunk(inBuf[0], 0));
+    buffers.add(new BufferChunk(inBuf[2], 3072));
+    in = InStream.create("test", buffers.get(), 4096);
     inStream = new DataInputStream(in);
     positions[768].reset();
     for(int i=0; i < 256; ++i) {
@@ -321,9 +461,8 @@ public class TestInStream {
 
   @Test
   public void testEmptyDiskRange() throws IOException {
-    List<DiskRange> rangeList = new ArrayList<>();
-    rangeList.add(new BufferChunk(ByteBuffer.allocate(0), 0));
-    InStream stream = new InStream.UncompressedStream("test", rangeList, 0);
+    DiskRangeList range = new BufferChunk(ByteBuffer.allocate(0), 0);
+    InStream stream = new InStream.UncompressedStream("test", range, 0);
     assertEquals(0, stream.available());
     stream.seek(new PositionProvider() {
       @Override

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
index 399f35e..0888cef 100644
--- a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -55,10 +55,9 @@ public class TestIntegerCompressionReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     RunLengthIntegerReaderV2 in =
-      new RunLengthIntegerReaderV2(InStream.create
-                                   ("test", new ByteBuffer[]{inBuf},
-                                    new long[]{0}, inBuf.remaining(),
-                                    codec, 1000), true, false);
+      new RunLengthIntegerReaderV2(InStream.create("test",
+          new BufferChunk(inBuf, 0), inBuf.remaining(),
+          InStream.options().withCodec(codec).withBufferSize(1000)), true, false);
     for(int i=0; i < 2048; ++i) {
       int x = (int) in.next();
       if (i < 1024) {
@@ -110,10 +109,8 @@ public class TestIntegerCompressionReader {
     inBuf.flip();
     RunLengthIntegerReaderV2 in =
       new RunLengthIntegerReaderV2(InStream.create("test",
-                                                   new ByteBuffer[]{inBuf},
-                                                   new long[]{0},
-                                                   inBuf.remaining(),
-                                                   null, 100), true, false);
+                                                   new BufferChunk(inBuf, 0),
+                                                   inBuf.remaining()), true, false);
     for(int i=0; i < 2048; i += 10) {
       int x = (int) in.next();
       if (i < 1024) {

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestOutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestOutStream.java b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
index 8abd12b..473c7a6 100644
--- a/java/core/src/test/org/apache/orc/impl/TestOutStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,17 +19,38 @@
 package org.apache.orc.impl;
 
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.OrcProto;
 import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.writer.StreamOptions;
+import org.junit.Assume;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.Key;
+import java.security.NoSuchAlgorithmException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class TestOutStream {
 
+  public static final boolean TEST_AES_256;
+  static {
+    try {
+      TEST_AES_256 = Cipher.getMaxAllowedKeyLength("AES") != 128;
+    } catch (NoSuchAlgorithmException e) {
+      throw new IllegalArgumentException("Unknown algorithm", e);
+    }
+  }
+
   @Test
   public void testFlush() throws Exception {
     PhysicalWriter.OutputReceiver receiver =
@@ -44,7 +65,7 @@ public class TestOutStream {
   }
 
   @Test
-  public void testAssertBufferSizeValid() throws Exception {
+  public void testAssertBufferSizeValid() {
     try {
       OutStream.assertBufferSizeValid(1 + (1<<23));
       fail("Invalid buffer-size " + (1 + (1<<23)) + " should have been blocked.");
@@ -55,4 +76,143 @@ public class TestOutStream {
 
     OutStream.assertBufferSizeValid((1<<23) -  1);
   }
+
+  @Test
+  public void testEncryption() throws Exception {
+    TestInStream.OutputCollector receiver = new TestInStream.OutputCollector();
+    EncryptionAlgorithm aes128 = EncryptionAlgorithm.AES_128;
+    byte[] keyBytes = new byte[aes128.keyLength()];
+    for(int i=0; i < keyBytes.length; ++i) {
+      keyBytes[i] = (byte) i;
+    }
+    Key material = new SecretKeySpec(keyBytes, aes128.getAlgorithm());
+    StreamName name = new StreamName(0x34, OrcProto.Stream.Kind.DATA);
+    // test out stripe 18
+    byte[] iv = CryptoUtils.createIvForStream(aes128, name, 18);
+    StreamOptions options = new StreamOptions(50)
+        .withEncryption(aes128, material, iv);
+    OutStream stream = new OutStream("test", options, receiver);
+    byte[] data = new byte[210];
+    for(int i=0; i < data.length; ++i) {
+      data[i] = (byte) (i+3);
+    }
+
+    stream.write(data);
+    stream.flush();
+    byte[] output = receiver.buffer.get();
+
+    // These are the outputs of aes256 with the key and incrementing ivs.
+    // I included these hardcoded values to make sure that we are getting
+    // AES128 encryption.
+    //
+    // I used http://extranet.cryptomathic.com/aescalc/index to compute these:
+    // key: 000102030405060708090a0b0c0d0e0f
+    // input: 00003400010000120000000000000000
+    // ecb encrypt output: 822252A81CC7E7FE3E51F50E0E9B64B1
+    int[] generated = new int[]{
+        0x82, 0x22, 0x52, 0xA8, 0x1C, 0xC7, 0xE7, 0xFE, // block 0
+        0x3E, 0x51, 0xF5, 0x0E, 0x0E, 0x9B, 0x64, 0xB1,
+
+        0xF6, 0x4D, 0x36, 0xA9, 0xD9, 0xD7, 0x55, 0xDE, // block 1
+        0xCB, 0xD5, 0x62, 0x0E, 0x6D, 0xA6, 0x6B, 0x16,
+
+        0x00, 0x0B, 0xE8, 0xBA, 0x9D, 0xDE, 0x78, 0xEC, // block 2
+        0x73, 0x05, 0xF6, 0x1E, 0x76, 0xD7, 0x9B, 0x7A,
+
+        0x47, 0xE9, 0x61, 0x90, 0x65, 0x8B, 0x54, 0xAC, // block 3
+        0xF2, 0x3F, 0x67, 0xAE, 0x25, 0x63, 0x1D, 0x4B,
+
+        0x41, 0x48, 0xC4, 0x15, 0x5F, 0x2A, 0x7F, 0x91, // block 4
+        0x9A, 0x87, 0xA1, 0x09, 0xFF, 0x68, 0x68, 0xCC,
+
+        0xC0, 0x80, 0x52, 0xD4, 0xA5, 0x07, 0x4B, 0x79, // block 5
+        0xC7, 0x08, 0x46, 0x46, 0x8C, 0x74, 0x2C, 0x0D,
+
+        0x9F, 0x55, 0x7E, 0xA7, 0x17, 0x47, 0x91, 0xFD, // block 6
+        0x01, 0xD4, 0x24, 0x1F, 0x76, 0xA1, 0xDC, 0xC3,
+
+        0xEA, 0x13, 0x4C, 0x29, 0xCA, 0x68, 0x1E, 0x4F, // block 7
+        0x0D, 0x19, 0xE5, 0x09, 0xF9, 0xC5, 0xF4, 0x15,
+
+        0x9A, 0xAD, 0xC4, 0xA1, 0x0F, 0x28, 0xD4, 0x3D, // block 8
+        0x59, 0xF0, 0x68, 0xD3, 0xC4, 0x98, 0x74, 0x68,
+
+        0x37, 0xA4, 0xF4, 0x7C, 0x02, 0xCE, 0xC6, 0xCA, // block 9
+        0xA1, 0xF8, 0xC3, 0x8C, 0x7B, 0x72, 0x38, 0xD1,
+
+        0xAA, 0x52, 0x90, 0xDE, 0x28, 0xA1, 0x53, 0x6E, // block a
+        0xA6, 0x5C, 0xC0, 0x89, 0xC4, 0x21, 0x76, 0xC0,
+
+        0x1F, 0xED, 0x0A, 0xF9, 0xA2, 0xA7, 0xC1, 0x8D, // block b
+        0xA0, 0x92, 0x44, 0x4F, 0x60, 0x51, 0x7F, 0xD8,
+
+        0x6D, 0x16, 0xAF, 0x46, 0x1C, 0x27, 0x20, 0x1C, // block c
+        0x01, 0xBD, 0xC5, 0x0B, 0x62, 0x3F, 0xEF, 0xEE,
+
+        0x37, 0xae                                      // block d
+    };
+    assertEquals(generated.length, output.length);
+    for(int i=0; i < generated.length; ++i) {
+      assertEquals("i = " + i, (byte) (generated[i] ^ data[i]), output[i]);
+    }
+
+    receiver.buffer.clear();
+    stream.changeIv(CryptoUtils.createIvForStream(aes128, name, 19));
+
+    data = new byte[]{0x47, 0x77, 0x65, 0x6e};
+    stream.write(data);
+    stream.flush();
+    output = receiver.buffer.get();
+    generated = new int[]{0x16, 0x03, 0xE6, 0xC3};
+    assertEquals(generated.length, output.length);
+    for(int i=0; i < generated.length; ++i) {
+      assertEquals("i = " + i, (byte) (generated[i] ^ data[i]), output[i]);
+    }
+  }
+
+  @Test
+  public void testCompression256Encryption() throws Exception {
+    // disable test if AES_256 is not available
+    Assume.assumeTrue(TEST_AES_256);
+    TestInStream.OutputCollector receiver = new TestInStream.OutputCollector();
+    EncryptionAlgorithm aes256 = EncryptionAlgorithm.AES_256;
+    byte[] keyBytes = new byte[aes256.keyLength()];
+    for(int i=0; i < keyBytes.length; ++i) {
+      keyBytes[i] = (byte) (i * 13);
+    }
+    Key material = new SecretKeySpec(keyBytes, aes256.getAlgorithm());
+    StreamName name = new StreamName(0x1, OrcProto.Stream.Kind.DATA);
+    byte[] iv = CryptoUtils.createIvForStream(aes256, name, 0);
+    StreamOptions options = new StreamOptions(1024)
+        .withCodec(new ZlibCodec())
+        .withEncryption(aes256, material, iv);
+    OutStream stream = new OutStream("test", options, receiver);
+    for(int i=0; i < 10000; ++i) {
+      stream.write(("The Cheesy Poofs " + i + "\n")
+          .getBytes(StandardCharsets.UTF_8));
+    }
+    stream.flush();
+    // get the compressed, encrypted data
+    byte[] encrypted = receiver.buffer.get();
+
+    // decrypt it
+    Cipher decrypt = aes256.createCipher();
+    decrypt.init(Cipher.DECRYPT_MODE, material, new IvParameterSpec(iv));
+    byte[] compressed = decrypt.doFinal(encrypted);
+
+    // use InStream to decompress it
+    BufferChunkList ranges = new BufferChunkList();
+    ranges.add(new BufferChunk(ByteBuffer.wrap(compressed), 0));
+    InStream decompressedStream = InStream.create(name.toString(), ranges.get(),
+        compressed.length,
+        InStream.options().withCodec(new ZlibCodec()).withBufferSize(1024));
+
+    // check the contents of the decompressed stream
+    BufferedReader reader
+        = new BufferedReader(new InputStreamReader(decompressedStream));
+    for(int i=0; i < 10000; ++i) {
+      assertEquals("i = " + i, "The Cheesy Poofs " + i, reader.readLine());
+    }
+    assertEquals(null, reader.readLine());
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
index a14bef1..c0b22fa 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
@@ -47,7 +47,7 @@ public class TestRunLengthByteReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
-        new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), null, 100));
+        new BufferChunk(inBuf, 0), inBuf.remaining()));
     for(int i=0; i < 2048; ++i) {
       int x = in.next() & 0xff;
       if (i < 1024) {
@@ -89,7 +89,8 @@ public class TestRunLengthByteReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
-        new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), codec, 500));
+        new BufferChunk(inBuf, 0), inBuf.remaining(),
+        InStream.options().withCodec(codec).withBufferSize(500)));
     for(int i=0; i < 2048; ++i) {
       int x = in.next() & 0xff;
       if (i < 1024) {
@@ -126,7 +127,7 @@ public class TestRunLengthByteReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
-        new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), null, 100));
+        new BufferChunk(inBuf, 0), inBuf.remaining()));
     for(int i=0; i < 2048; i += 10) {
       int x = in.next() & 0xff;
       if (i < 1024) {

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
index 28239ba..f067f83 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
@@ -55,8 +55,8 @@ public class TestRunLengthIntegerReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     RunLengthIntegerReader in = new RunLengthIntegerReader(InStream.create
-        ("test", new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
-            codec, 1000), true);
+        ("test", new BufferChunk(inBuf, 0), inBuf.remaining(),
+            InStream.options().withCodec(codec).withBufferSize(1000)), true);
     for(int i=0; i < 2048; ++i) {
       int x = (int) in.next();
       if (i < 1024) {
@@ -107,8 +107,7 @@ public class TestRunLengthIntegerReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     RunLengthIntegerReader in = new RunLengthIntegerReader(InStream.create
-        ("test", new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
-            null, 100), true);
+        ("test", new BufferChunk(inBuf, 0), inBuf.remaining()), true);
     for(int i=0; i < 2048; i += 10) {
       int x = (int) in.next();
       if (i < 1024) {

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
index d203415..f9a925b 100644
--- a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
+++ b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -34,7 +34,6 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -1543,14 +1542,13 @@ public class TestSchemaEvolution {
                            OrcProto.Stream.Kind kind,
                            int... values) throws IOException {
     StreamName name = new StreamName(id, kind);
-    List<DiskRange> ranges = new ArrayList<>();
+    BufferChunkList ranges = new BufferChunkList();
     byte[] buffer = new byte[values.length];
     for(int i=0; i < values.length; ++i) {
       buffer[i] = (byte) values[i];
     }
     ranges.add(new BufferChunk(ByteBuffer.wrap(buffer), 0));
-    streams.put(name, InStream.create(name.toString(), ranges, values.length, null,
-        values.length));
+    streams.put(name, InStream.create(name.toString(), ranges.get(), values.length));
   }
 
   @Test


[2/2] orc git commit: ORC-251: Extend InStream and OutStream to support encryption.

Posted by om...@apache.org.
ORC-251: Extend InStream and OutStream to support encryption.

Fixes #278

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/edbb9673
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/edbb9673
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/edbb9673

Branch: refs/heads/master
Commit: edbb9673d143247633dddcfdbea24f7869b151fe
Parents: f3dd9c1
Author: Owen O'Malley <om...@apache.org>
Authored: Wed May 9 09:36:28 2018 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Fri Jul 13 14:28:06 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/orc/CompressionCodec.java   |   5 +
 .../org/apache/orc/impl/AircompressorCodec.java |  11 +-
 .../org/apache/orc/impl/BufferChunkList.java    |  47 ++
 .../java/org/apache/orc/impl/CryptoUtils.java   |  78 +++
 .../src/java/org/apache/orc/impl/InStream.java  | 554 ++++++++++++++-----
 .../src/java/org/apache/orc/impl/OrcTail.java   |   3 +-
 .../src/java/org/apache/orc/impl/OutStream.java | 118 +++-
 .../java/org/apache/orc/impl/ReaderImpl.java    |  89 +--
 .../org/apache/orc/impl/RecordReaderImpl.java   |   6 +-
 .../org/apache/orc/impl/RecordReaderUtils.java  | 106 ++--
 .../orc/impl/SettableUncompressedStream.java    |  43 --
 .../java/org/apache/orc/impl/SnappyCodec.java   |   3 +-
 .../java/org/apache/orc/impl/WriterImpl.java    |   4 +-
 .../src/java/org/apache/orc/impl/ZlibCodec.java |   6 +
 .../apache/orc/impl/writer/StreamOptions.java   |  85 +++
 .../org/apache/orc/impl/TestBitFieldReader.java |  14 +-
 .../test/org/apache/orc/impl/TestBitPack.java   |   5 +-
 .../org/apache/orc/impl/TestCryptoUtils.java    |  47 ++
 .../test/org/apache/orc/impl/TestInStream.java  | 217 ++++++--
 .../orc/impl/TestIntegerCompressionReader.java  |  15 +-
 .../test/org/apache/orc/impl/TestOutStream.java | 164 +++++-
 .../orc/impl/TestRunLengthByteReader.java       |   7 +-
 .../orc/impl/TestRunLengthIntegerReader.java    |   7 +-
 .../apache/orc/impl/TestSchemaEvolution.java    |   8 +-
 24 files changed, 1230 insertions(+), 412 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/CompressionCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/CompressionCodec.java b/java/core/src/java/org/apache/orc/CompressionCodec.java
index dd517b3..1d2af57 100644
--- a/java/core/src/java/org/apache/orc/CompressionCodec.java
+++ b/java/core/src/java/org/apache/orc/CompressionCodec.java
@@ -69,4 +69,9 @@ public interface CompressionCodec {
 
   /** Closes the codec, releasing the resources. */
   void close();
+
+  /**
+   * Get the compression kind.
+   */
+  CompressionKind getKind();
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
index 39d678c..2609d26 100644
--- a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
@@ -21,16 +21,20 @@ package org.apache.orc.impl;
 import io.airlift.compress.Compressor;
 import io.airlift.compress.Decompressor;
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
 public class AircompressorCodec implements CompressionCodec {
+  private final CompressionKind kind;
   private final Compressor compressor;
   private final Decompressor decompressor;
 
-  AircompressorCodec(Compressor compressor, Decompressor decompressor) {
+  AircompressorCodec(CompressionKind kind, Compressor compressor,
+                     Decompressor decompressor) {
+    this.kind = kind;
     this.compressor = compressor;
     this.decompressor = decompressor;
   }
@@ -109,4 +113,9 @@ public class AircompressorCodec implements CompressionCodec {
   public void close() {
     // Nothing to do.
   }
+
+  @Override
+  public CompressionKind getKind() {
+    return kind;
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/BufferChunkList.java b/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
new file mode 100644
index 0000000..d8a89db
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+/**
+ * Builds a list of buffer chunks
+ */
+public class BufferChunkList {
+  private BufferChunk head;
+  private BufferChunk tail;
+
+  public void add(BufferChunk value) {
+    if (head == null) {
+      head = value;
+      tail = value;
+    } else {
+      tail.next = value;
+      value.prev = tail;
+      tail = value;
+    }
+  }
+
+  public BufferChunk get() {
+    return head;
+  }
+
+  public void clear() {
+    head = null;
+    tail = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/CryptoUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/CryptoUtils.java b/java/core/src/java/org/apache/orc/impl/CryptoUtils.java
new file mode 100644
index 0000000..072b06b
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/CryptoUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.EncryptionAlgorithm;
+import java.security.SecureRandom;
+
+/**
+ * This class has routines to work with encryption within ORC files.
+ */
+public class CryptoUtils {
+
+  private static final int COLUMN_ID_LENGTH = 3;
+  private static final int KIND_LENGTH = 2;
+  private static final int STRIPE_ID_LENGTH = 3;
+  private static final int MIN_COUNT_BYTES = 8;
+
+  static final int MAX_COLUMN = 0xffffff;
+  static final int MAX_KIND = 0xffff;
+  static final int MAX_STRIPE = 0xffffff;
+
+  /**
+   * Create a unique IV for each stream within a single key.
+   * The top bytes are set with the column, stream kind, and stripe id and the
+   * lower 8 bytes are always 0.
+   * @param name the stream name
+   * @param stripeId the stripe id
+   * @return the iv for the stream
+   */
+  public static byte[] createIvForStream(EncryptionAlgorithm algorithm,
+                                         StreamName name,
+                                         int stripeId) {
+    byte[] iv = new byte[algorithm.getIvLength()];
+    int columnId = name.getColumn();
+    if (columnId < 0 || columnId > MAX_COLUMN) {
+      throw new IllegalArgumentException("ORC encryption is limited to " +
+          MAX_COLUMN + " columns. Value = " + columnId);
+    }
+    int k = name.getKind().getNumber();
+    if (k < 0 || k > MAX_KIND) {
+      throw new IllegalArgumentException("ORC encryption is limited to " +
+          MAX_KIND + " stream kinds. Value = " + k);
+    }
+    if (stripeId < 0 || stripeId > MAX_STRIPE){
+      throw new IllegalArgumentException("ORC encryption is limited to " +
+          MAX_STRIPE + " stripes. Value = " + stripeId);
+    }
+    // the rest of the iv is used for counting within the stream
+    if (iv.length - (COLUMN_ID_LENGTH + KIND_LENGTH + STRIPE_ID_LENGTH) < MIN_COUNT_BYTES) {
+      throw new IllegalArgumentException("Not enough space in the iv for the count");
+    }
+    iv[0] = (byte)(columnId >> 16);
+    iv[1] = (byte)(columnId >> 8);
+    iv[2] = (byte)columnId;
+    iv[COLUMN_ID_LENGTH] = (byte)(k >> 8);
+    iv[COLUMN_ID_LENGTH+1] = (byte)(k);
+    iv[COLUMN_ID_LENGTH+KIND_LENGTH] = (byte)(stripeId >> 16);
+    iv[COLUMN_ID_LENGTH+KIND_LENGTH+1] = (byte)(stripeId >> 8);
+    iv[COLUMN_ID_LENGTH+KIND_LENGTH+2] = (byte)stripeId;
+    return iv;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
index 94e9232..06f439e 100644
--- a/java/core/src/java/org/apache/orc/impl/InStream.java
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,17 +20,22 @@ package org.apache.orc.impl;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.Key;
 
+import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.io.DiskRange;
 
 import com.google.protobuf.CodedInputStream;
 
+import javax.crypto.Cipher;
+import javax.crypto.ShortBufferException;
+import javax.crypto.spec.IvParameterSpec;
+
 public abstract class InStream extends InputStream {
 
   private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
@@ -55,66 +60,101 @@ public abstract class InStream extends InputStream {
   @Override
   public abstract void close();
 
+  static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
+    int result = 0;
+    DiskRangeList range = list;
+    while (range != null && range != current) {
+      result += 1;
+      range = range.next;
+    }
+    return result;
+  }
+
+  /**
+   * Implements a stream over an uncompressed stream.
+   */
   public static class UncompressedStream extends InStream {
-    private List<DiskRange> bytes;
+    private DiskRangeList bytes;
     private long length;
     protected long currentOffset;
-    private ByteBuffer range;
-    private int currentRange;
+    protected ByteBuffer decrypted;
+    protected DiskRangeList currentRange;
+
+    /**
+     * Create the stream without calling reset on it.
+     * This is used for the subclass that needs to do more setup.
+     * @param name name of the stream
+     * @param length the number of bytes for the stream
+     */
+    public UncompressedStream(String name, long length) {
+      super(name, length);
+    }
 
-    public UncompressedStream(String name, List<DiskRange> input, long length) {
+    public UncompressedStream(String name,
+                              DiskRangeList input,
+                              long length) {
       super(name, length);
       reset(input, length);
     }
 
-    protected void reset(List<DiskRange> input, long length) {
+    protected void reset(DiskRangeList input, long length) {
       this.bytes = input;
       this.length = length;
-      currentRange = 0;
-      currentOffset = 0;
-      range = null;
+      currentOffset = input == null ? 0 : input.getOffset();
+      setCurrent(input, true);
     }
 
     @Override
     public int read() {
-      if (range == null || range.remaining() == 0) {
+      if (decrypted == null || decrypted.remaining() == 0) {
         if (currentOffset == length) {
           return -1;
         }
-        seek(currentOffset);
+        setCurrent(currentRange.next, false);
       }
       currentOffset += 1;
-      return 0xff & range.get();
+      return 0xff & decrypted.get();
+    }
+
+    protected void setCurrent(DiskRangeList newRange, boolean isJump) {
+      currentRange = newRange;
+      if (newRange != null) {
+        decrypted = newRange.getData().slice();
+        // Move the position in the ByteBuffer to match the currentOffset,
+        // which is relative to the stream.
+        decrypted.position((int) (currentOffset - newRange.getOffset()));
+      }
     }
 
     @Override
     public int read(byte[] data, int offset, int length) {
-      if (range == null || range.remaining() == 0) {
+      if (decrypted == null || decrypted.remaining() == 0) {
         if (currentOffset == this.length) {
           return -1;
         }
-        seek(currentOffset);
+        setCurrent(currentRange.next, false);
       }
-      int actualLength = Math.min(length, range.remaining());
-      range.get(data, offset, actualLength);
+      int actualLength = Math.min(length, decrypted.remaining());
+      decrypted.get(data, offset, actualLength);
       currentOffset += actualLength;
       return actualLength;
     }
 
     @Override
     public int available() {
-      if (range != null && range.remaining() > 0) {
-        return range.remaining();
+      if (decrypted != null && decrypted.remaining() > 0) {
+        return decrypted.remaining();
       }
       return (int) (length - currentOffset);
     }
 
     @Override
     public void close() {
-      currentRange = bytes.size();
+      currentRange = null;
       currentOffset = length;
       // explicit de-ref of bytes[]
-      bytes.clear();
+      decrypted = null;
+      bytes = null;
     }
 
     @Override
@@ -122,45 +162,37 @@ public abstract class InStream extends InputStream {
       seek(index.getNext());
     }
 
-    public void seek(long desired) {
-      if (desired == 0 && bytes.isEmpty()) {
+    public void seek(long desired) throws IOException {
+      if (desired == 0 && bytes == null) {
         return;
       }
-      int i = 0;
-      for (DiskRange curRange : bytes) {
-        if (curRange.getOffset() <= desired &&
-            (desired - curRange.getOffset()) < curRange.getLength()) {
-          currentOffset = desired;
-          currentRange = i;
-          this.range = curRange.getData().duplicate();
-          int pos = range.position();
-          pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
-          this.range.position(pos);
-          return;
-        }
-        ++i;
-      }
-      // if they are seeking to the precise end, go ahead and let them go there
-      int segments = bytes.size();
-      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+      // If we are seeking inside of the current range, just reposition.
+      if (currentRange != null && desired >= currentRange.getOffset() &&
+          desired < currentRange.getEnd()) {
+        decrypted.position((int) (desired - currentRange.getOffset()));
         currentOffset = desired;
-        currentRange = segments - 1;
-        DiskRange curRange = bytes.get(currentRange);
-        this.range = curRange.getData().duplicate();
-        int pos = range.position();
-        pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
-        this.range.position(pos);
-        return;
+      } else {
+        for (DiskRangeList curRange = bytes; curRange != null;
+             curRange = curRange.next) {
+          if (curRange.getOffset() <= desired &&
+              (curRange.next == null ? desired <= curRange.getEnd() :
+                  desired < curRange.getEnd())) {
+            currentOffset = desired;
+            setCurrent(curRange, true);
+            return;
+          }
+        }
+        throw new IllegalArgumentException("Seek in " + name + " to " +
+            desired + " is outside of the data");
       }
-      throw new IllegalArgumentException("Seek in " + name + " to " +
-        desired + " is outside of the data");
     }
 
     @Override
     public String toString() {
       return "uncompressed stream " + name + " position: " + currentOffset +
-          " length: " + length + " range: " + currentRange +
-          " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
+          " length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
+          " offset: " + (decrypted == null ? 0 : decrypted.position()) +
+          " limit: " + (decrypted == null ? 0 : decrypted.limit());
     }
   }
 
@@ -173,33 +205,208 @@ public abstract class InStream extends InputStream {
     }
   }
 
+  /**
+   * Manage the state of the decryption, including the ability to seek.
+   */
+  static class EncryptionState {
+    private final String name;
+    private final EncryptionAlgorithm algorithm;
+    private final Key key;
+    private final byte[] iv;
+    private final Cipher cipher;
+    private ByteBuffer decrypted;
+
+    EncryptionState(String name, StreamOptions options) {
+      this.name = name;
+      algorithm = options.algorithm;
+      key = options.key;
+      iv = options.iv;
+      cipher = algorithm.createCipher();
+    }
+
+    /**
+     * We are seeking to a new range, so update the cipher to change the IV
+     * to match. This code assumes that we only support encryption in CTR mode.
+     * @param offset where we are seeking to in the stream
+     */
+    void changeIv(long offset) {
+      int blockSize = cipher.getBlockSize();
+      long encryptionBlocks = offset / blockSize;
+      long extra = offset % blockSize;
+      byte[] advancedIv;
+      if (encryptionBlocks == 0) {
+        advancedIv = iv;
+      } else {
+        // Add the encryption blocks into the initial iv, to compensate for
+        // skipping over decrypting those bytes.
+        advancedIv = new byte[iv.length];
+        System.arraycopy(iv, 0, advancedIv, 0, iv.length);
+        int posn = iv.length - 1;
+        while (encryptionBlocks > 0) {
+          long sum = (advancedIv[posn] & 0xff) + encryptionBlocks;
+          advancedIv[posn--] = (byte) sum;
+          encryptionBlocks =  sum / 0x100;
+        }
+      }
+      try {
+        cipher.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(advancedIv));
+        // If the range starts at an offset that doesn't match the encryption
+        // block, we need to advance some bytes within an encryption block.
+        if (extra > 0) {
+          byte[] wasted = new byte[(int) extra];
+          cipher.update(wasted, 0, wasted.length, wasted, 0);
+        }
+      } catch (InvalidKeyException e) {
+        throw new IllegalArgumentException("Invalid key on " + name, e);
+      } catch (InvalidAlgorithmParameterException e) {
+        throw new IllegalArgumentException("Invalid iv on " + name, e);
+      } catch (ShortBufferException e) {
+        throw new IllegalArgumentException("Short buffer in " + name, e);
+      }
+    }
+
+    /**
+     * Decrypt the given range into the decrypted buffer. It is assumed that
+     * the cipher is correctly initialized by changeIv before this is called.
+     * @param newRange the range to decrypte
+     * @return a reused ByteBuffer, which is used by each call to decrypt
+     */
+    ByteBuffer decrypt(DiskRangeList newRange)  {
+      final long offset = newRange.getOffset();
+      final int length = newRange.getLength();
+      if (decrypted == null || decrypted.capacity() < length) {
+        decrypted = ByteBuffer.allocate(length);
+      } else {
+        decrypted.clear();
+      }
+      ByteBuffer encrypted = newRange.getData().duplicate();
+      try {
+        int output = cipher.update(encrypted, decrypted);
+        if (output != length) {
+          throw new IllegalArgumentException("Problem decrypting " + name +
+              " at " + offset);
+        }
+      } catch (ShortBufferException e) {
+        throw new IllegalArgumentException("Problem decrypting " + name +
+            " at " + offset, e);
+      }
+      decrypted.flip();
+      return decrypted;
+    }
+
+    void close() {
+      decrypted = null;
+    }
+  }
+
+  /**
+   * Implements a stream over an encrypted, but uncompressed stream.
+   */
+  public static class EncryptedStream extends UncompressedStream {
+    private final EncryptionState encrypt;
+
+    public EncryptedStream(String name, DiskRangeList input, long length,
+                           StreamOptions options) {
+      super(name, length);
+      encrypt = new EncryptionState(name, options);
+      reset(input, length);
+    }
+
+    @Override
+    protected void setCurrent(DiskRangeList newRange, boolean isJump) {
+      currentRange = newRange;
+      if (newRange != null) {
+        if (isJump) {
+          encrypt.changeIv(newRange.getOffset());
+        }
+        decrypted = encrypt.decrypt(newRange);
+        decrypted.position((int) (currentOffset - newRange.getOffset()));
+      }
+    }
+
+    @Override
+    public void close() {
+      super.close();
+      encrypt.close();
+    }
+
+    @Override
+    public String toString() {
+      return "encrypted " + super.toString();
+    }
+  }
+
   private static class CompressedStream extends InStream {
-    private final List<DiskRange> bytes;
+    private DiskRangeList bytes;
     private final int bufferSize;
     private ByteBuffer uncompressed;
     private final CompressionCodec codec;
-    private ByteBuffer compressed;
-    private long currentOffset;
-    private int currentRange;
+    protected ByteBuffer compressed;
+    protected long currentOffset;
+    protected DiskRangeList currentRange;
     private boolean isUncompressedOriginal;
 
-    public CompressedStream(String name, List<DiskRange> input, long length,
-                            CompressionCodec codec, int bufferSize) {
+    /**
+     * Create the stream without resetting the input stream.
+     * This is used in subclasses so they can finish initializing before
+     * reset is called.
+     * @param name the name of the stream
+     * @param length the total number of bytes in the stream
+     * @param options the options used to read the stream
+     */
+    public CompressedStream(String name,
+                            long length,
+                            StreamOptions options) {
       super(name, length);
-      this.bytes = input;
-      this.codec = codec;
-      this.bufferSize = bufferSize;
-      currentOffset = 0;
-      currentRange = 0;
+      this.codec = options.codec;
+      this.bufferSize = options.bufferSize;
+    }
+
+    /**
+     * Create the stream and initialize the input for the stream.
+     * @param name the name of the stream
+     * @param input the input data
+     * @param length the total length of the stream
+     * @param options the options to read the data with
+     */
+    public CompressedStream(String name,
+                            DiskRangeList input,
+                            long length,
+                            StreamOptions options) {
+      super(name, length);
+      this.codec = options.codec;
+      this.bufferSize = options.bufferSize;
+      reset(input, length);
+    }
+
+    /**
+     * Reset the input to a new set of data.
+     * @param input the input data
+     * @param length the number of bytes in the stream
+     */
+    void reset(DiskRangeList input, long length) {
+      bytes = input;
+      this.length = length;
+      currentOffset = input == null ? 0 : input.getOffset();
+      setCurrent(input, true);
     }
 
     private void allocateForUncompressed(int size, boolean isDirect) {
       uncompressed = allocateBuffer(size, isDirect);
     }
 
+    protected void setCurrent(DiskRangeList newRange,
+                              boolean isJump) {
+      currentRange = newRange;
+      if (newRange != null) {
+        compressed = newRange.getData().slice();
+        compressed.position((int) (currentOffset - newRange.getOffset()));
+      }
+    }
+
     private void readHeader() throws IOException {
       if (compressed == null || compressed.remaining() <= 0) {
-        seek(currentOffset);
+        setCurrent(currentRange.next, false);
       }
       if (compressed.remaining() > OutStream.HEADER_SIZE) {
         int b0 = compressed.get() & 0xff;
@@ -277,9 +484,9 @@ public abstract class InStream extends InputStream {
     public void close() {
       uncompressed = null;
       compressed = null;
-      currentRange = bytes.size();
+      currentRange = null;
       currentOffset = length;
-      bytes.clear();
+      bytes = null;
     }
 
     @Override
@@ -299,6 +506,7 @@ public abstract class InStream extends InputStream {
     /* slices a read only contiguous buffer of chunkLength */
     private ByteBuffer slice(int chunkLength) throws IOException {
       int len = chunkLength;
+      final DiskRangeList oldRange = currentRange;
       final long oldOffset = currentOffset;
       ByteBuffer slice;
       if (compressed.remaining() >= len) {
@@ -308,7 +516,7 @@ public abstract class InStream extends InputStream {
         currentOffset += len;
         compressed.position(compressed.position() + len);
         return slice;
-      } else if (currentRange >= (bytes.size() - 1)) {
+      } else if (currentRange.next == null) {
         // nothing has been modified yet
         throw new IOException("EOF in " + this + " while trying to read " +
             chunkLength + " bytes");
@@ -326,21 +534,19 @@ public abstract class InStream extends InputStream {
       currentOffset += compressed.remaining();
       len -= compressed.remaining();
       copy.put(compressed);
-      ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
 
-      while (len > 0 && iter.hasNext()) {
-        ++currentRange;
+      while (currentRange.next != null) {
+        setCurrent(currentRange.next, false);
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
         }
-        DiskRange range = iter.next();
-        compressed = range.getData().duplicate();
         if (compressed.remaining() >= len) {
           slice = compressed.slice();
           slice.limit(len);
           copy.put(slice);
           currentOffset += len;
           compressed.position(compressed.position() + len);
+          copy.flip();
           return copy;
         }
         currentOffset += compressed.remaining();
@@ -349,37 +555,24 @@ public abstract class InStream extends InputStream {
       }
 
       // restore offsets for exception clarity
-      seek(oldOffset);
+      currentOffset = oldOffset;
+      setCurrent(oldRange, true);
       throw new IOException("EOF in " + this + " while trying to read " +
           chunkLength + " bytes");
     }
 
-    private void seek(long desired) throws IOException {
-      if (desired == 0 && bytes.isEmpty()) {
+    void seek(long desired) throws IOException {
+      if (desired == 0 && bytes == null) {
         return;
       }
-      int i = 0;
-      for (DiskRange range : bytes) {
-        if (range.getOffset() <= desired && desired < range.getEnd()) {
-          currentRange = i;
-          compressed = range.getData().duplicate();
-          int pos = compressed.position();
-          pos += (int)(desired - range.getOffset());
-          compressed.position(pos);
+      for (DiskRangeList range = bytes; range != null; range = range.next) {
+        if (range.getOffset() <= desired &&
+            (range.next == null ? desired <= range.getEnd() :
+              desired < range.getEnd())) {
           currentOffset = desired;
+          setCurrent(range, true);
           return;
         }
-        ++i;
-      }
-      // if they are seeking to the precise end, go ahead and let them go there
-      int segments = bytes.size();
-      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
-        DiskRange range = bytes.get(segments - 1);
-        currentRange = segments - 1;
-        compressed = range.getData().duplicate();
-        compressed.position(compressed.limit());
-        currentOffset = desired;
-        return;
       }
       throw new IOException("Seek outside of data in " + this + " to " + desired);
     }
@@ -387,12 +580,16 @@ public abstract class InStream extends InputStream {
     private String rangeString() {
       StringBuilder builder = new StringBuilder();
       int i = 0;
-      for (DiskRange range : bytes) {
+      for (DiskRangeList range = bytes; range != null; range = range.next){
         if (i != 0) {
           builder.append("; ");
         }
-        builder.append(" range " + i + " = " + range.getOffset()
-            + " to " + (range.getEnd() - range.getOffset()));
+        builder.append(" range ");
+        builder.append(i);
+        builder.append(" = ");
+        builder.append(range.getOffset());
+        builder.append(" to ");
+        builder.append(range.getEnd());
         ++i;
       }
       return builder.toString();
@@ -401,8 +598,9 @@ public abstract class InStream extends InputStream {
     @Override
     public String toString() {
       return "compressed stream " + name + " position: " + currentOffset +
-          " length: " + length + " range: " + currentRange +
-          " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
+          " length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
+          " offset: " + (compressed == null ? 0 : compressed.position()) +
+          " limit: " + (compressed == null ? 0 : compressed.limit()) +
           rangeString() +
           (uncompressed == null ? "" :
               " uncompressed: " + uncompressed.position() + " to " +
@@ -410,33 +608,116 @@ public abstract class InStream extends InputStream {
     }
   }
 
+  private static class EncryptedCompressedStream extends CompressedStream {
+    private final EncryptionState encrypt;
+
+    public EncryptedCompressedStream(String name,
+                                     DiskRangeList input,
+                                     long length,
+                                     StreamOptions options) {
+      super(name, length, options);
+      encrypt = new EncryptionState(name, options);
+      reset(input, length);
+    }
+
+    @Override
+    protected void setCurrent(DiskRangeList newRange, boolean isJump) {
+      currentRange = newRange;
+      if (newRange != null) {
+        if (isJump) {
+          encrypt.changeIv(newRange.getOffset());
+        }
+        compressed = encrypt.decrypt(newRange);
+        compressed.position((int) (currentOffset - newRange.getOffset()));
+      }
+    }
+
+    @Override
+    public void close() {
+      super.close();
+      encrypt.close();
+    }
+
+    @Override
+    public String toString() {
+      return "encrypted " + super.toString();
+    }
+  }
+
   public abstract void seek(PositionProvider index) throws IOException;
 
+  public static class StreamOptions implements Cloneable {
+    private CompressionCodec codec;
+    private int bufferSize;
+    private EncryptionAlgorithm algorithm;
+    private Key key;
+    private byte[] iv;
+
+    public StreamOptions withCodec(CompressionCodec value) {
+      this.codec = value;
+      return this;
+    }
+
+    public StreamOptions withBufferSize(int value) {
+      bufferSize = value;
+      return this;
+    }
+
+    public StreamOptions withEncryption(EncryptionAlgorithm algorithm,
+                                        Key key,
+                                        byte[] iv) {
+      this.algorithm = algorithm;
+      this.key = key;
+      this.iv = iv;
+      return this;
+    }
+
+    public CompressionCodec getCodec() {
+      return codec;
+    }
+
+    @Override
+    public StreamOptions clone() {
+      try {
+        StreamOptions clone = (StreamOptions) super.clone();
+        if (clone.codec != null) {
+          // Make sure we don't share the same codec between two readers.
+          clone.codec = OrcCodecPool.getCodec(codec.getKind());
+        }
+        return clone;
+      } catch (CloneNotSupportedException e) {
+        throw new UnsupportedOperationException("uncloneable", e);
+      }
+    }
+  }
+
+  public static StreamOptions options() {
+    return new StreamOptions();
+  }
+
   /**
-   * Create an input stream from a list of buffers.
-   * @param streamName the name of the stream
-   * @param buffers the list of ranges of bytes for the stream
-   * @param offsets a list of offsets (the same length as input) that must
-   *                contain the first offset of the each set of bytes in input
+   * Create an input stream from a list of disk ranges with data.
+   * @param name the name of the stream
+   * @param input the list of ranges of bytes for the stream; from disk or cache
    * @param length the length in bytes of the stream
-   * @param codec the compression codec
-   * @param bufferSize the compression buffer size
+   * @param options the options to read with
    * @return an input stream
-   * @throws IOException
    */
-  //@VisibleForTesting
-  @Deprecated
-  public static InStream create(String streamName,
-                                ByteBuffer[] buffers,
-                                long[] offsets,
+  public static InStream create(String name,
+                                DiskRangeList input,
                                 long length,
-                                CompressionCodec codec,
-                                int bufferSize) throws IOException {
-    List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
-    for (int i = 0; i < buffers.length; ++i) {
-      input.add(new BufferChunk(buffers[i], offsets[i]));
+                                StreamOptions options) {
+    if (options == null || options.codec == null) {
+      if (options == null || options.key == null) {
+        return new UncompressedStream(name, input, length);
+      } else {
+        return new EncryptedStream(name, input, length, options);
+      }
+    } else if (options.key == null) {
+      return new CompressedStream(name, input, length, options);
+    } else {
+      return new EncryptedCompressedStream(name, input, length, options);
     }
-    return create(streamName, input, length, codec, bufferSize);
   }
 
   /**
@@ -444,41 +725,22 @@ public abstract class InStream extends InputStream {
    * @param name the name of the stream
    * @param input the list of ranges of bytes for the stream; from disk or cache
    * @param length the length in bytes of the stream
-   * @param codec the compression codec
-   * @param bufferSize the compression buffer size
    * @return an input stream
-   * @throws IOException
    */
   public static InStream create(String name,
-                                List<DiskRange> input,
-                                long length,
-                                CompressionCodec codec,
-                                int bufferSize) throws IOException {
-    if (codec == null) {
-      return new UncompressedStream(name, input, length);
-    } else {
-      return new CompressedStream(name, input, length, codec, bufferSize);
-    }
+                                DiskRangeList input,
+                                long length) throws IOException {
+    return create(name, input, length, null);
   }
 
   /**
-   * Creates coded input stream (used for protobuf message parsing) with higher message size limit.
+   * Creates coded input stream (used for protobuf message parsing) with higher
+   * message size limit.
    *
-   * @param name       the name of the stream
-   * @param input      the list of ranges of bytes for the stream; from disk or cache
-   * @param length     the length in bytes of the stream
-   * @param codec      the compression codec
-   * @param bufferSize the compression buffer size
+   * @param inStream   the stream to wrap.
    * @return coded input stream
-   * @throws IOException
    */
-  public static CodedInputStream createCodedInputStream(
-      String name,
-      List<DiskRange> input,
-      long length,
-      CompressionCodec codec,
-      int bufferSize) throws IOException {
-    InStream inStream = create(name, input, length, codec, bufferSize);
+  public static CodedInputStream createCodedInputStream(InStream inStream) {
     CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream);
     codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT);
     return codedInputStream;

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/OrcTail.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcTail.java b/java/core/src/java/org/apache/orc/impl/OrcTail.java
index 3c78874..9e8a5f2 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcTail.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcTail.java
@@ -109,7 +109,8 @@ public final class OrcTail {
       CompressionCodec codec = OrcCodecPool.getCodec(getCompressionKind());
       try {
         metadata = extractMetadata(serializedTail, 0,
-            (int) fileTail.getPostscript().getMetadataLength(), codec, getCompressionBufferSize());
+            (int) fileTail.getPostscript().getMetadataLength(),
+            InStream.options().withCodec(codec).withBufferSize(getCompressionBufferSize()));
       } finally {
         OrcCodecPool.returnCodec(getCompressionKind(), codec);
       }

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java
index d6302d5..435f43c 100644
--- a/java/core/src/java/org/apache/orc/impl/OutStream.java
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,13 +18,26 @@
 package org.apache.orc.impl;
 
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
 import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.writer.StreamOptions;
 
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.ShortBufferException;
+import javax.crypto.spec.IvParameterSpec;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.Key;
 
+/**
+ * The output stream for writing to ORC files.
+ * It handles both compression and encryption.
+ */
 public class OutStream extends PositionedOutputStream {
-
   public static final int HEADER_SIZE = 3;
   private final String name;
   private final PhysicalWriter.OutputReceiver receiver;
@@ -55,19 +68,89 @@ public class OutStream extends PositionedOutputStream {
   private final CompressionCodec codec;
   private long compressedBytes = 0;
   private long uncompressedBytes = 0;
+  private final Cipher cipher;
+  private final Key key;
 
   public OutStream(String name,
                    int bufferSize,
                    CompressionCodec codec,
-                   PhysicalWriter.OutputReceiver receiver) throws IOException {
+                   PhysicalWriter.OutputReceiver receiver) {
+    this(name, new StreamOptions(bufferSize).withCodec(codec), receiver);
+  }
+
+  public OutStream(String name,
+                   StreamOptions options,
+                   PhysicalWriter.OutputReceiver receiver) {
     this.name = name;
-    this.bufferSize = bufferSize;
-    this.codec = codec;
+    this.bufferSize = options.getBufferSize();
+    this.codec = options.getCodec();
     this.receiver = receiver;
+    if (options.isEncrypted()) {
+      this.cipher = options.getAlgorithm().createCipher();
+      this.key = options.getKey();
+      changeIv(options.getIv());
+    } else {
+      this.cipher = null;
+      this.key = null;
+    }
   }
 
-  public void clear() throws IOException {
-    flush();
+  /**
+   * Change the current Initialization Vector (IV) for the encryption.
+   */
+  void changeIv(byte[] newIv) {
+    try {
+      cipher.init(Cipher.ENCRYPT_MODE, key, new IvParameterSpec(newIv));
+    } catch (InvalidKeyException e) {
+      throw new IllegalStateException("ORC bad encryption key for " +
+          toString(), e);
+    } catch (InvalidAlgorithmParameterException e) {
+      throw new IllegalStateException("ORC bad encryption parameter for " +
+          toString(), e);
+    }
+  }
+
+  /**
+   * When a buffer is done, we send it to the receiver to store.
+   * If we are encrypting, encrypt the buffer before we pass it on.
+   * @param buffer the buffer to store
+   */
+  void outputBuffer(ByteBuffer buffer) throws IOException {
+    if (cipher != null) {
+      ByteBuffer output = buffer.duplicate();
+      int len = buffer.remaining();
+      try {
+        int encrypted = cipher.update(buffer, output);
+        output.flip();
+        receiver.output(output);
+        if (encrypted != len) {
+          throw new IllegalArgumentException("Encryption of incomplete buffer "
+              + len + " -> " + encrypted + " in " + toString());
+        }
+      } catch (ShortBufferException e) {
+        throw new IOException("Short buffer in encryption in " + toString(), e);
+      }
+    } else {
+      receiver.output(buffer);
+    }
+  }
+
+  /**
+   * Ensure that the cipher didn't save any data.
+   * The next call should be to changeIv to restart the encryption on a new IV.
+   */
+  void finishEncryption() {
+    try {
+      byte[] finalBytes = cipher.doFinal();
+      if (finalBytes != null && finalBytes.length != 0) {
+        throw new IllegalStateException("We shouldn't have remaining bytes " +
+            toString());
+      }
+    } catch (IllegalBlockSizeException e) {
+      throw new IllegalArgumentException("Bad block size", e);
+    } catch (BadPaddingException e) {
+      throw new IllegalArgumentException("Bad padding", e);
+    }
   }
 
   /**
@@ -90,7 +173,7 @@ public class OutStream extends PositionedOutputStream {
     buffer.put(position + 2, (byte) (val >> 15));
   }
 
-  private void getNewInputBuffer() throws IOException {
+  private void getNewInputBuffer() {
     if (codec == null) {
       current = ByteBuffer.allocate(bufferSize);
     } else {
@@ -117,11 +200,11 @@ public class OutStream extends PositionedOutputStream {
   /**
    * Allocate a new output buffer if we are compressing.
    */
-  private ByteBuffer getNewOutputBuffer() throws IOException {
+  private ByteBuffer getNewOutputBuffer() {
     return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
   }
 
-  private void flip() throws IOException {
+  private void flip() {
     current.limit(current.position());
     current.position(codec == null ? 0 : HEADER_SIZE);
   }
@@ -165,7 +248,7 @@ public class OutStream extends PositionedOutputStream {
     }
     flip();
     if (codec == null) {
-      receiver.output(current);
+      outputBuffer(current);
       getNewInputBuffer();
     } else {
       if (compressed == null) {
@@ -190,7 +273,7 @@ public class OutStream extends PositionedOutputStream {
         // if we have less than the next header left, spill it.
         if (compressed.remaining() < HEADER_SIZE) {
           compressed.flip();
-          receiver.output(compressed);
+          outputBuffer(compressed);
           compressed = overflow;
           overflow = null;
         }
@@ -203,7 +286,7 @@ public class OutStream extends PositionedOutputStream {
         if (sizePosn != 0) {
           compressed.position(sizePosn);
           compressed.flip();
-          receiver.output(compressed);
+          outputBuffer(compressed);
           compressed = null;
           // if we have an overflow, clear it and make it the new compress
           // buffer
@@ -223,14 +306,14 @@ public class OutStream extends PositionedOutputStream {
         current.position(0);
         // update the header with the current length
         writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
-        receiver.output(current);
+        outputBuffer(current);
         getNewInputBuffer();
       }
     }
   }
 
   @Override
-  public void getPosition(PositionRecorder recorder) throws IOException {
+  public void getPosition(PositionRecorder recorder) {
     if (codec == null) {
       recorder.addPosition(uncompressedBytes);
     } else {
@@ -244,7 +327,10 @@ public class OutStream extends PositionedOutputStream {
     spill();
     if (compressed != null && compressed.position() != 0) {
       compressed.flip();
-      receiver.output(compressed);
+      outputBuffer(compressed);
+    }
+    if (cipher != null) {
+      finishEncryption();
     }
     compressed = null;
     uncompressedBytes = 0;

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index bba580f..3919988 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.io.Text;
 import org.apache.orc.OrcProto;
@@ -275,31 +274,6 @@ public class ReaderImpl implements Reader {
   }
 
   /**
-   * Ensure this is an ORC file to prevent users from trying to read text
-   * files or RC files as ORC files.
-   * @param psLen the postscript length
-   * @param buffer the tail of the file
-   */
-  protected static void ensureOrcFooter(ByteBuffer buffer, int psLen) throws IOException {
-    int magicLength = OrcFile.MAGIC.length();
-    int fullLength = magicLength + 1;
-    if (psLen < fullLength || buffer.remaining() < fullLength) {
-      throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen);
-    }
-
-    int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength;
-    byte[] array = buffer.array();
-    // now look for the magic string at the end of the postscript.
-    if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) {
-      // if it isn't there, this may be 0.11.0 version of the ORC file.
-      // Read the first 3 bytes from the buffer to check for the header
-      if (!Text.decode(buffer.array(), 0, magicLength).equals(OrcFile.MAGIC)) {
-        throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen);
-      }
-    }
-  }
-
-  /**
    * Build a version string out of an array.
    * @param version the version number as a list
    * @return the human readable form of the version string
@@ -405,26 +379,20 @@ public class ReaderImpl implements Reader {
     return OrcFile.WriterVersion.FUTURE;
   }
 
-  static List<DiskRange> singleton(DiskRange item) {
-    List<DiskRange> result = new ArrayList<>();
-    result.add(item);
-    return result;
-  }
-
   private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
-      int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
+      int footerSize, InStream.StreamOptions options) throws IOException {
     bb.position(footerAbsPos);
     bb.limit(footerAbsPos + footerSize);
-    return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer",
-        singleton(new BufferChunk(bb, 0)), footerSize, codec, bufferSize));
+    return OrcProto.Footer.parseFrom(InStream.createCodedInputStream(
+        InStream.create("footer", new BufferChunk(bb, 0), footerSize, options)));
   }
 
   public static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
-      int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
+      int metadataSize, InStream.StreamOptions options) throws IOException {
     bb.position(metadataAbsPos);
     bb.limit(metadataAbsPos + metadataSize);
-    return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata",
-        singleton(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize));
+    return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream(
+        InStream.create("metadata", new BufferChunk(bb, 0), metadataSize, options)));
   }
 
   private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
@@ -450,41 +418,6 @@ public class ReaderImpl implements Reader {
     return ps;
   }
 
-  public static OrcTail extractFileTail(ByteBuffer buffer)
-      throws IOException {
-    return extractFileTail(buffer, -1, -1);
-  }
-
-  public static OrcTail extractFileTail(ByteBuffer buffer, long fileLength, long modificationTime)
-      throws IOException {
-    int readSize = buffer.limit();
-    int psLen = buffer.get(readSize - 1) & 0xff;
-    int psOffset = readSize - 1 - psLen;
-    ensureOrcFooter(buffer, psLen);
-    byte[] psBuffer = new byte[psLen];
-    System.arraycopy(buffer.array(), psOffset, psBuffer, 0, psLen);
-    OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(psBuffer);
-    int footerSize = (int) ps.getFooterLength();
-    CompressionKind kind = CompressionKind.valueOf(ps.getCompression().name());
-    OrcProto.FileTail.Builder fileTailBuilder;
-    CompressionCodec codec = OrcCodecPool.getCodec(kind);
-    try {
-      OrcProto.Footer footer = extractFooter(buffer,
-        (int) (buffer.position() + ps.getMetadataLength()),
-        footerSize, codec, (int) ps.getCompressionBlockSize());
-      fileTailBuilder = OrcProto.FileTail.newBuilder()
-        .setPostscriptLength(psLen)
-        .setPostscript(ps)
-        .setFooter(footer)
-        .setFileLength(fileLength);
-    } finally {
-      OrcCodecPool.returnCodec(kind, codec);
-    }
-    // clear does not clear the contents but sets position to 0 and limit = capacity
-    buffer.clear();
-    return new OrcTail(fileTailBuilder.build(), buffer.slice(), modificationTime);
-  }
-
   /**
    * Build a virtual OrcTail for empty files.
    * @return a new OrcTail
@@ -599,7 +532,8 @@ public class ReaderImpl implements Reader {
       OrcProto.Footer footer;
       CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
       try {
-        footer = extractFooter(footerBuffer, 0, footerSize, codec, bufferSize);
+        footer = extractFooter(footerBuffer, 0, footerSize,
+            InStream.options().withCodec(codec).withBufferSize(bufferSize));
       } finally {
         OrcCodecPool.returnCodec(compressionKind, codec);
       }
@@ -788,7 +722,8 @@ public class ReaderImpl implements Reader {
     if (metadata == null) {
       CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
       try {
-        metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize);
+        metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize,
+            InStream.options().withCodec(codec).withBufferSize(bufferSize));
       } finally {
         OrcCodecPool.returnCodec(compressionKind, codec);
       }
@@ -803,10 +738,6 @@ public class ReaderImpl implements Reader {
     return result;
   }
 
-  public List<OrcProto.UserMetadataItem> getOrcProtoUserMetadata() {
-    return userMetadata;
-  }
-
   @Override
   public List<Integer> getVersionList() {
     return versionList;

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 8bb4d9a..a8e0be1 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -1160,6 +1160,8 @@ public class RecordReaderImpl implements RecordReader {
       int bufferSize,
       Map<StreamName, InStream> streams) throws IOException {
     long streamOffset = 0;
+    InStream.StreamOptions options = InStream.options().withCodec(codec)
+        .withBufferSize(bufferSize);
     for (OrcProto.Stream streamDesc : streamDescriptions) {
       int column = streamDesc.getColumn();
       if ((includeColumn != null &&
@@ -1169,11 +1171,11 @@ public class RecordReaderImpl implements RecordReader {
         streamOffset += streamDesc.getLength();
         continue;
       }
-      List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
+      DiskRangeList buffers = RecordReaderUtils.getStreamBuffers(
           ranges, streamOffset, streamDesc.getLength());
       StreamName name = new StreamName(column, streamDesc.getKind());
       streams.put(name, InStream.create(name.toString(), buffers,
-          streamDesc.getLength(), codec, bufferSize));
+          streamDesc.getLength(), options));
       streamOffset += streamDesc.getLength();
     }
   }

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 705e768..a70c988 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -150,8 +150,7 @@ public class RecordReaderUtils {
     private final FileSystem fs;
     private final Path path;
     private final boolean useZeroCopy;
-    private CompressionCodec codec;
-    private final int bufferSize;
+    private InStream.StreamOptions options = InStream.options();
     private final int typeCount;
     private CompressionKind compressionKind;
 
@@ -160,8 +159,8 @@ public class RecordReaderUtils {
       this.path = properties.getPath();
       this.useZeroCopy = properties.getZeroCopy();
       this.compressionKind = properties.getCompression();
-      this.codec = OrcCodecPool.getCodec(compressionKind);
-      this.bufferSize = properties.getBufferSize();
+      options.withCodec(OrcCodecPool.getCodec(compressionKind))
+          .withBufferSize(properties.getBufferSize());
       this.typeCount = properties.getTypeCount();
     }
 
@@ -171,7 +170,7 @@ public class RecordReaderUtils {
       if (useZeroCopy) {
         // ZCR only uses codec for boolean checks.
         pool = new ByteBufferAllocatorPool();
-        zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
+        zcr = RecordReaderUtils.createZeroCopyShim(file, options.getCodec(), pool);
       } else {
         zcr = null;
       }
@@ -228,9 +227,9 @@ public class RecordReaderUtils {
                 bb.position((int) (offset - range.getOffset()));
                 bb.limit((int) (bb.position() + stream.getLength()));
                 indexes[column] = OrcProto.RowIndex.parseFrom(
-                    InStream.createCodedInputStream("index",
-                        ReaderImpl.singleton(new BufferChunk(bb, 0)),
-                        stream.getLength(), codec, bufferSize));
+                    InStream.createCodedInputStream(InStream.create("index",
+                        new BufferChunk(bb, 0),
+                        stream.getLength(), options)));
               }
               break;
             case BLOOM_FILTER:
@@ -240,9 +239,9 @@ public class RecordReaderUtils {
                 bb.position((int) (offset - range.getOffset()));
                 bb.limit((int) (bb.position() + stream.getLength()));
                 bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom
-                    (InStream.createCodedInputStream("bloom_filter",
-                        ReaderImpl.singleton(new BufferChunk(bb, 0)),
-                    stream.getLength(), codec, bufferSize));
+                    (InStream.createCodedInputStream(InStream.create(
+                        "bloom_filter", new BufferChunk(bb, 0),
+                        stream.getLength(), options)));
               }
               break;
             default:
@@ -266,8 +265,8 @@ public class RecordReaderUtils {
       ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
       file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
       return OrcProto.StripeFooter.parseFrom(
-          InStream.createCodedInputStream("footer", ReaderImpl.singleton(
-              new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize));
+          InStream.createCodedInputStream(InStream.create("footer",
+              new BufferChunk(tailBuf, 0), tailLength, options)));
     }
 
     @Override
@@ -278,9 +277,9 @@ public class RecordReaderUtils {
 
     @Override
     public void close() throws IOException {
-      if (codec != null) {
-        OrcCodecPool.returnCodec(compressionKind, codec);
-        codec = null;
+      if (options.getCodec() != null) {
+        OrcCodecPool.returnCodec(compressionKind, options.getCodec());
+        options.withCodec(null);
       }
       if (pool != null) {
         pool.clear();
@@ -313,9 +312,9 @@ public class RecordReaderUtils {
       }
       try {
         DefaultDataReader clone = (DefaultDataReader) super.clone();
-        if (codec != null) {
+        if (options.getCodec() != null) {
           // Make sure we don't share the same codec between two readers.
-          clone.codec = OrcCodecPool.getCodec(clone.compressionKind);
+          clone.options = options.clone();
         }
         return clone;
       } catch (CloneNotSupportedException e) {
@@ -325,7 +324,7 @@ public class RecordReaderUtils {
 
     @Override
     public CompressionCodec getCompressionCodec() {
-      return codec;
+      return options.getCodec();
     }
   }
 
@@ -572,42 +571,49 @@ public class RecordReaderUtils {
   }
 
 
-  static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) {
+  static DiskRangeList getStreamBuffers(DiskRangeList range, long offset,
+                                        long length) {
     // This assumes sorted ranges (as do many other parts of ORC code.
-    ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
-    if (length == 0) return buffers;
-    long streamEnd = offset + length;
-    boolean inRange = false;
-    while (range != null) {
-      if (!inRange) {
-        if (range.getEnd() <= offset) {
-          range = range.next;
-          continue; // Skip until we are in range.
+    BufferChunkList result = new BufferChunkList();
+    if (length != 0) {
+      long streamEnd = offset + length;
+      boolean inRange = false;
+      while (range != null) {
+        if (!inRange) {
+          if (range.getEnd() <= offset) {
+            range = range.next;
+            continue; // Skip until we are in range.
+          }
+          inRange = true;
+          if (range.getOffset() < offset) {
+            // Partial first buffer, add a slice of it.
+            result.add((BufferChunk) range.sliceAndShift(offset,
+                Math.min(streamEnd, range.getEnd()), -offset));
+            if (range.getEnd() >= streamEnd)
+              break; // Partial first buffer is also partial last buffer.
+            range = range.next;
+            continue;
+          }
+        } else if (range.getOffset() >= streamEnd) {
+          break;
         }
-        inRange = true;
-        if (range.getOffset() < offset) {
-          // Partial first buffer, add a slice of it.
-          buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset));
-          if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer.
-          range = range.next;
-          continue;
+        if (range.getEnd() > streamEnd) {
+          // Partial last buffer (may also be the first buffer), add a slice of it.
+          result.add((BufferChunk) range.sliceAndShift(range.getOffset(),
+              streamEnd, -offset));
+          break;
         }
-      } else if (range.getOffset() >= streamEnd) {
-        break;
-      }
-      if (range.getEnd() > streamEnd) {
-        // Partial last buffer (may also be the first buffer), add a slice of it.
-        buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset));
-        break;
+        // Buffer that belongs entirely to one stream.
+        // TODO: ideally we would want to reuse the object and remove it from
+        //       the list, but we cannot because bufferChunks is also used by
+        //       clearStreams for zcr. Create a useless dup.
+        result.add((BufferChunk) range.sliceAndShift(range.getOffset(),
+            range.getEnd(), -offset));
+        if (range.getEnd() == streamEnd) break;
+        range = range.next;
       }
-      // Buffer that belongs entirely to one stream.
-      // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
-      //       because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
-      buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset));
-      if (range.getEnd() == streamEnd) break;
-      range = range.next;
     }
-    return buffers;
+    return result.get();
   }
 
   static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file,

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java b/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java
deleted file mode 100644
index da92c62..0000000
--- a/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.common.DiskRangeInfo;
-import org.apache.hadoop.hive.common.io.DiskRange;
-
-/**
- * An uncompressed stream whose underlying byte buffer can be set.
- */
-public class SettableUncompressedStream extends InStream.UncompressedStream {
-
-  public SettableUncompressedStream(String name, List<DiskRange> input, long length) {
-    super(name, input, length);
-    setOffset(input);
-  }
-
-  public void setBuffers(DiskRangeInfo diskRangeInfo) {
-    reset(diskRangeInfo.getDiskRanges(), diskRangeInfo.getTotalLength());
-    setOffset(diskRangeInfo.getDiskRanges());
-  }
-
-  private void setOffset(List<DiskRange> list) {
-    currentOffset = list.isEmpty() ? 0 : list.get(0).getOffset();
-  }
-}

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
index 452f315..9269eb6 100644
--- a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
@@ -20,6 +20,7 @@ package org.apache.orc.impl;
 
 import io.airlift.compress.snappy.SnappyCompressor;
 import io.airlift.compress.snappy.SnappyDecompressor;
+import org.apache.orc.CompressionKind;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -32,7 +33,7 @@ public class SnappyCodec extends AircompressorCodec
   HadoopShims.DirectDecompressor decompressShim = null;
 
   SnappyCodec() {
-    super(new SnappyCompressor(), new SnappyDecompressor());
+    super(CompressionKind.SNAPPY, new SnappyCompressor(), new SnappyDecompressor());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index d6239f2..4c3c548 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -236,10 +236,10 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
       case SNAPPY:
         return new SnappyCodec();
       case LZO:
-        return new AircompressorCodec(new LzoCompressor(),
+        return new AircompressorCodec(kind, new LzoCompressor(),
             new LzoDecompressor());
       case LZ4:
-        return new AircompressorCodec(new Lz4Compressor(),
+        return new AircompressorCodec(kind, new Lz4Compressor(),
             new Lz4Decompressor());
       default:
         throw new IllegalArgumentException("Unknown compression codec: " +

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
index de1a6bf..19a3728 100644
--- a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
@@ -25,6 +25,7 @@ import java.util.zip.Deflater;
 import java.util.zip.Inflater;
 
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
 
 public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
   private static final HadoopShims SHIMS = HadoopShimsFactory.get();
@@ -187,4 +188,9 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
       decompressShim.end();
     }
   }
+
+  @Override
+  public CompressionKind getKind() {
+    return CompressionKind.ZLIB;
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java b/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
new file mode 100644
index 0000000..3d0c48a
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl.writer;
+
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
+
+import java.security.Key;
+
+/**
+ * The compression and encryption options for writing a stream.
+ */
+public class StreamOptions {
+  private CompressionCodec codec;
+  private final int bufferSize;
+  private EncryptionAlgorithm algorithm;
+  private Key key;
+  private byte[] iv;
+
+  /**
+   * An option object with the given buffer size set.
+   * @param bufferSize the size of the buffers.
+   */
+  public StreamOptions(int bufferSize) {
+    this.bufferSize = bufferSize;
+  }
+
+  /**
+   * Compress using the given codec.
+   * @param codec the codec to compress with
+   * @return this
+   */
+  public StreamOptions withCodec(CompressionCodec codec) {
+    this.codec = codec;
+    return this;
+  }
+
+  public StreamOptions withEncryption(EncryptionAlgorithm algorithm,
+                                      Key key,
+                                      byte[] iv) {
+    this.algorithm = algorithm;
+    this.key = key;
+    this.iv = iv;
+    return this;
+  }
+
+  public CompressionCodec getCodec() {
+    return codec;
+  }
+
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
+  public boolean isEncrypted() {
+    return key != null;
+  }
+
+  public Key getKey() {
+    return key;
+  }
+
+  public EncryptionAlgorithm getAlgorithm() {
+    return algorithm;
+  }
+
+  public byte[] getIv() {
+    return iv;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
index f7a2a5c..03c379e 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -48,8 +48,8 @@ public class TestBitFieldReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     BitFieldReader in = new BitFieldReader(InStream.create("test",
-        new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
-        codec, 500));
+        new BufferChunk(inBuf, 0), inBuf.remaining(),
+        InStream.options().withCodec(codec).withBufferSize(500)));
     for(int i=0; i < COUNT; ++i) {
       int x = in.next();
       if (i < COUNT / 2) {
@@ -96,8 +96,8 @@ public class TestBitFieldReader {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    BitFieldReader in = new BitFieldReader(InStream.create("test", new ByteBuffer[]{inBuf},
-        new long[]{0}, inBuf.remaining(), null, 100));
+    BitFieldReader in = new BitFieldReader(InStream.create("test",
+        new BufferChunk(inBuf, 0), inBuf.remaining()));
     for(int i=0; i < COUNT; i += 5) {
       int x = (int) in.next();
       if (i < COUNT/2) {
@@ -133,8 +133,8 @@ public class TestBitFieldReader {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    BitFieldReader in = new BitFieldReader(InStream.create("test", new ByteBuffer[]{inBuf},
-        new long[]{0}, inBuf.remaining(), null, 100));
+    BitFieldReader in = new BitFieldReader(InStream.create("test",
+        new BufferChunk(inBuf, 0), inBuf.remaining()));
     in.seek(posn);
     in.skip(10);
     for(int r = 210; r < COUNT; ++r) {

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestBitPack.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitPack.java b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
index f2d3d64..3eba3e6 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitPack.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
@@ -108,8 +108,9 @@ public class TestBitPack {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     long[] buff = new long[SIZE];
-    utils.readInts(buff, 0, SIZE, fixedWidth, InStream.create("test", new ByteBuffer[] { inBuf },
-        new long[] { 0 }, inBuf.remaining(), null, SIZE));
+    utils.readInts(buff, 0, SIZE, fixedWidth,
+        InStream.create("test", new BufferChunk(inBuf,0),
+        inBuf.remaining()));
     for (int i = 0; i < SIZE; i++) {
       buff[i] = utils.zigzagDecode(buff[i]);
     }

http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java b/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java
new file mode 100644
index 0000000..203d3e7
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.OrcProto;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TestCryptoUtils {
+
+  @Test
+  public void testCreateStreamIv() throws Exception {
+    byte[] iv = CryptoUtils.createIvForStream(EncryptionAlgorithm.AES_128,
+        new StreamName(0x234567,
+        OrcProto.Stream.Kind.BLOOM_FILTER_UTF8), 0x123456);
+    assertEquals(16, iv.length);
+    assertEquals(0x23, iv[0]);
+    assertEquals(0x45, iv[1]);
+    assertEquals(0x67, iv[2]);
+    assertEquals(0x0, iv[3]);
+    assertEquals(0x8, iv[4]);
+    assertEquals(0x12, iv[5]);
+    assertEquals(0x34, iv[6]);
+    assertEquals(0x56, iv[7]);
+  }
+}