You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/07/03 16:32:35 UTC

[orc] branch master updated: ORC-516: Update InStream for column compression.

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7afd89a  ORC-516: Update InStream for column compression.
7afd89a is described below

commit 7afd89a855a1e19811371caaaf168c4cac1e2e3f
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Mon Jun 24 13:05:55 2019 -0700

    ORC-516: Update InStream for column compression.
    
    Fixes #407
    
    Signed-off-by: Owen O'Malley <om...@apache.org>
---
 .../src/java/org/apache/orc/impl/BufferChunk.java  |  29 +-
 .../java/org/apache/orc/impl/BufferChunkList.java  |  14 +
 .../src/java/org/apache/orc/impl/InStream.java     | 297 +++++++++++++--------
 .../src/java/org/apache/orc/impl/ReaderImpl.java   |   4 +-
 .../java/org/apache/orc/impl/RecordReaderImpl.java |   4 +-
 .../org/apache/orc/impl/RecordReaderUtils.java     |   6 +-
 .../org/apache/orc/impl/TestBitFieldReader.java    |   6 +-
 .../src/test/org/apache/orc/impl/TestBitPack.java  |   2 +-
 .../src/test/org/apache/orc/impl/TestInStream.java |  26 +-
 .../orc/impl/TestIntegerCompressionReader.java     |   4 +-
 .../test/org/apache/orc/impl/TestOutStream.java    |  15 +-
 .../org/apache/orc/impl/TestPhysicalFsWriter.java  |  19 +-
 .../apache/orc/impl/TestRunLengthByteReader.java   |   6 +-
 .../orc/impl/TestRunLengthIntegerReader.java       |   8 +-
 .../org/apache/orc/impl/TestSchemaEvolution.java   |  15 +-
 15 files changed, 282 insertions(+), 173 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/BufferChunk.java b/java/core/src/java/org/apache/orc/impl/BufferChunk.java
index afde82f..951a6d8 100644
--- a/java/core/src/java/org/apache/orc/impl/BufferChunk.java
+++ b/java/core/src/java/org/apache/orc/impl/BufferChunk.java
@@ -1,6 +1,4 @@
-package org.apache.orc.impl;
-
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +16,8 @@ package org.apache.orc.impl;
  * limitations under the License.
  */
 
+package org.apache.orc.impl;
+
 import org.apache.hadoop.hive.common.io.DiskRange;
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.slf4j.Logger;
@@ -34,15 +34,20 @@ public class BufferChunk extends DiskRangeList {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(BufferChunk.class);
-  final ByteBuffer chunk;
+  private ByteBuffer chunk;
+
+  public BufferChunk(long offset, int length) {
+    super(offset, offset + length);
+    chunk = null;
+  }
 
   public BufferChunk(ByteBuffer chunk, long offset) {
     super(offset, offset + chunk.remaining());
     this.chunk = chunk;
   }
 
-  public ByteBuffer getChunk() {
-    return chunk;
+  public void setChunk(ByteBuffer chunk) {
+    this.chunk = chunk;
   }
 
   @Override
@@ -52,10 +57,14 @@ public class BufferChunk extends DiskRangeList {
 
   @Override
   public final String toString() {
-    boolean makesSense = chunk.remaining() == (end - offset);
-    return "data range [" + offset + ", " + end + "), size: " + chunk.remaining()
-        + (makesSense ? "" : "(!)") + " type: " +
-        (chunk.isDirect() ? "direct" : "array-backed");
+    if (chunk == null) {
+      return "data range[" + offset + ", " + end +")";
+    } else {
+      boolean makesSense = chunk.remaining() == (end - offset);
+      return "data range [" + offset + ", " + end + "), size: " + chunk.remaining()
+                 + (makesSense ? "" : "(!)") + " type: " +
+                 (chunk.isDirect() ? "direct" : "array-backed");
+    }
   }
 
   @Override
diff --git a/java/core/src/java/org/apache/orc/impl/BufferChunkList.java b/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
index d8a89db..1a0aea9 100644
--- a/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
+++ b/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
@@ -32,6 +32,7 @@ public class BufferChunkList {
     } else {
       tail.next = value;
       value.prev = tail;
+      value.next = null;
       tail = value;
     }
   }
@@ -40,6 +41,19 @@ public class BufferChunkList {
     return head;
   }
 
+  /**
+   * Get the nth element of the list
+   * @param chunk the element number to get from 0
+   * @return the given element number
+   */
+  public BufferChunk get(int chunk) {
+    BufferChunk ptr = head;
+    for(int i=0; i < chunk; ++i) {
+      ptr = ptr == null ? null : (BufferChunk) ptr.next;
+    }
+    return ptr;
+  }
+
   public void clear() {
     head = null;
     tail = null;
diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
index 06f439e..dd4429e 100644
--- a/java/core/src/java/org/apache/orc/impl/InStream.java
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.security.InvalidAlgorithmParameterException;
 import java.security.InvalidKeyException;
 import java.security.Key;
+import java.util.function.Consumer;
 
 import org.apache.hadoop.hive.common.io.DiskRangeList;
 import org.apache.orc.CompressionCodec;
@@ -41,25 +42,25 @@ public abstract class InStream extends InputStream {
   private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
   public static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
 
-  protected final String name;
-  protected long length;
+  protected final Object name;
+  protected final long offset;
+  protected final long length;
 
-  public InStream(String name, long length) {
+  public InStream(Object name, long offset, long length) {
     this.name = name;
+    this.offset = offset;
     this.length = length;
   }
 
-  public String getStreamName() {
-    return name;
-  }
-
-  public long getStreamLength() {
-    return length;
+  public String toString() {
+    return name.toString();
   }
 
   @Override
   public abstract void close();
 
+  public abstract void changeIv(Consumer<byte[]> modifier);
+
   static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
     int result = 0;
     DiskRangeList range = list;
@@ -75,8 +76,8 @@ public abstract class InStream extends InputStream {
    */
   public static class UncompressedStream extends InStream {
     private DiskRangeList bytes;
-    private long length;
-    protected long currentOffset;
+    // position in the stream (0..length)
+    protected long position;
     protected ByteBuffer decrypted;
     protected DiskRangeList currentRange;
 
@@ -86,33 +87,33 @@ public abstract class InStream extends InputStream {
      * @param name name of the stream
      * @param length the number of bytes for the stream
      */
-    public UncompressedStream(String name, long length) {
-      super(name, length);
+    public UncompressedStream(Object name, long offset, long length) {
+      super(name, offset, length);
     }
 
-    public UncompressedStream(String name,
+    public UncompressedStream(Object name,
                               DiskRangeList input,
+                              long offset,
                               long length) {
-      super(name, length);
-      reset(input, length);
+      super(name, offset, length);
+      reset(input);
     }
 
-    protected void reset(DiskRangeList input, long length) {
+    protected void reset(DiskRangeList input) {
       this.bytes = input;
-      this.length = length;
-      currentOffset = input == null ? 0 : input.getOffset();
+      position = input == null ? 0 : input.getOffset() - offset;
       setCurrent(input, true);
     }
 
     @Override
     public int read() {
       if (decrypted == null || decrypted.remaining() == 0) {
-        if (currentOffset == length) {
+        if (position == length) {
           return -1;
         }
         setCurrent(currentRange.next, false);
       }
-      currentOffset += 1;
+      position += 1;
       return 0xff & decrypted.get();
     }
 
@@ -122,21 +123,21 @@ public abstract class InStream extends InputStream {
         decrypted = newRange.getData().slice();
         // Move the position in the ByteBuffer to match the currentOffset,
         // which is relative to the stream.
-        decrypted.position((int) (currentOffset - newRange.getOffset()));
+        decrypted.position((int) (position + offset - newRange.getOffset()));
       }
     }
 
     @Override
     public int read(byte[] data, int offset, int length) {
       if (decrypted == null || decrypted.remaining() == 0) {
-        if (currentOffset == this.length) {
+        if (position == this.length) {
           return -1;
         }
         setCurrent(currentRange.next, false);
       }
       int actualLength = Math.min(length, decrypted.remaining());
       decrypted.get(data, offset, actualLength);
-      currentOffset += actualLength;
+      position += actualLength;
       return actualLength;
     }
 
@@ -145,19 +146,24 @@ public abstract class InStream extends InputStream {
       if (decrypted != null && decrypted.remaining() > 0) {
         return decrypted.remaining();
       }
-      return (int) (length - currentOffset);
+      return (int) (length - position);
     }
 
     @Override
     public void close() {
       currentRange = null;
-      currentOffset = length;
+      position = length;
       // explicit de-ref of bytes[]
       decrypted = null;
       bytes = null;
     }
 
     @Override
+    public void changeIv(Consumer<byte[]> modifier) {
+      // nothing to do
+    }
+
+    @Override
     public void seek(PositionProvider index) throws IOException {
       seek(index.getNext());
     }
@@ -166,18 +172,20 @@ public abstract class InStream extends InputStream {
       if (desired == 0 && bytes == null) {
         return;
       }
+      // compute the position of the desired point in file
+      long posn = desired + offset;
       // If we are seeking inside of the current range, just reposition.
-      if (currentRange != null && desired >= currentRange.getOffset() &&
-          desired < currentRange.getEnd()) {
-        decrypted.position((int) (desired - currentRange.getOffset()));
-        currentOffset = desired;
+      if (currentRange != null && posn >= currentRange.getOffset() &&
+          posn < currentRange.getEnd()) {
+        decrypted.position((int) (posn - currentRange.getOffset()));
+        position = desired;
       } else {
         for (DiskRangeList curRange = bytes; curRange != null;
              curRange = curRange.next) {
-          if (curRange.getOffset() <= desired &&
-              (curRange.next == null ? desired <= curRange.getEnd() :
-                  desired < curRange.getEnd())) {
-            currentOffset = desired;
+          if (curRange.getOffset() <= posn &&
+              (curRange.next == null ? posn <= curRange.getEnd() :
+                  posn < curRange.getEnd())) {
+            position = desired;
             setCurrent(curRange, true);
             return;
           }
@@ -189,7 +197,7 @@ public abstract class InStream extends InputStream {
 
     @Override
     public String toString() {
-      return "uncompressed stream " + name + " position: " + currentOffset +
+      return "uncompressed stream " + name + " position: " + position +
           " length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
           " offset: " + (decrypted == null ? 0 : decrypted.position()) +
           " limit: " + (decrypted == null ? 0 : decrypted.limit());
@@ -209,21 +217,39 @@ public abstract class InStream extends InputStream {
    * Manage the state of the decryption, including the ability to seek.
    */
   static class EncryptionState {
-    private final String name;
+    private final Object name;
     private final EncryptionAlgorithm algorithm;
     private final Key key;
     private final byte[] iv;
     private final Cipher cipher;
+    private final long offset;
     private ByteBuffer decrypted;
 
-    EncryptionState(String name, StreamOptions options) {
+    EncryptionState(Object name, long offset, StreamOptions options) {
       this.name = name;
-      algorithm = options.algorithm;
-      key = options.key;
-      iv = options.iv;
+      this.offset = offset;
+      algorithm = options.getAlgorithm();
+      key = options.getKey();
+      iv = options.getIv();
       cipher = algorithm.createCipher();
     }
 
+    void changeIv(Consumer<byte[]> modifier) {
+      modifier.accept(iv);
+      updateIv();
+      OutStream.logKeyAndIv(name, key, iv);
+    }
+
+    private void updateIv() {
+      try {
+        cipher.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv));
+      } catch (InvalidKeyException e) {
+        throw new IllegalArgumentException("Invalid key on " + name, e);
+      } catch (InvalidAlgorithmParameterException e) {
+        throw new IllegalArgumentException("Invalid iv on " + name, e);
+      }
+    }
+
     /**
      * We are seeking to a new range, so update the cipher to change the IV
      * to match. This code assumes that we only support encryption in CTR mode.
@@ -233,35 +259,27 @@ public abstract class InStream extends InputStream {
       int blockSize = cipher.getBlockSize();
       long encryptionBlocks = offset / blockSize;
       long extra = offset % blockSize;
-      byte[] advancedIv;
-      if (encryptionBlocks == 0) {
-        advancedIv = iv;
-      } else {
+      CryptoUtils.clearCounter(iv);
+      if (encryptionBlocks != 0) {
         // Add the encryption blocks into the initial iv, to compensate for
         // skipping over decrypting those bytes.
-        advancedIv = new byte[iv.length];
-        System.arraycopy(iv, 0, advancedIv, 0, iv.length);
         int posn = iv.length - 1;
         while (encryptionBlocks > 0) {
-          long sum = (advancedIv[posn] & 0xff) + encryptionBlocks;
-          advancedIv[posn--] = (byte) sum;
+          long sum = (iv[posn] & 0xff) + encryptionBlocks;
+          iv[posn--] = (byte) sum;
           encryptionBlocks =  sum / 0x100;
         }
       }
-      try {
-        cipher.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(advancedIv));
-        // If the range starts at an offset that doesn't match the encryption
-        // block, we need to advance some bytes within an encryption block.
-        if (extra > 0) {
+      updateIv();
+      // If the range starts at an offset that doesn't match the encryption
+      // block, we need to advance some bytes within an encryption block.
+      if (extra > 0) {
+        try {
           byte[] wasted = new byte[(int) extra];
           cipher.update(wasted, 0, wasted.length, wasted, 0);
+        } catch (ShortBufferException e) {
+          throw new IllegalArgumentException("Short buffer in " + name, e);
         }
-      } catch (InvalidKeyException e) {
-        throw new IllegalArgumentException("Invalid key on " + name, e);
-      } catch (InvalidAlgorithmParameterException e) {
-        throw new IllegalArgumentException("Invalid iv on " + name, e);
-      } catch (ShortBufferException e) {
-        throw new IllegalArgumentException("Short buffer in " + name, e);
       }
     }
 
@@ -305,11 +323,11 @@ public abstract class InStream extends InputStream {
   public static class EncryptedStream extends UncompressedStream {
     private final EncryptionState encrypt;
 
-    public EncryptedStream(String name, DiskRangeList input, long length,
+    public EncryptedStream(Object name, DiskRangeList input, long offset, long length,
                            StreamOptions options) {
-      super(name, length);
-      encrypt = new EncryptionState(name, options);
-      reset(input, length);
+      super(name, offset, length);
+      encrypt = new EncryptionState(name, offset, options);
+      reset(input);
     }
 
     @Override
@@ -317,10 +335,10 @@ public abstract class InStream extends InputStream {
       currentRange = newRange;
       if (newRange != null) {
         if (isJump) {
-          encrypt.changeIv(newRange.getOffset());
+          encrypt.changeIv(newRange.getOffset() - offset);
         }
         decrypted = encrypt.decrypt(newRange);
-        decrypted.position((int) (currentOffset - newRange.getOffset()));
+        decrypted.position((int) (position + offset - newRange.getOffset()));
       }
     }
 
@@ -331,6 +349,11 @@ public abstract class InStream extends InputStream {
     }
 
     @Override
+    public void changeIv(Consumer<byte[]> modifier) {
+      encrypt.changeIv(modifier);
+    }
+
+    @Override
     public String toString() {
       return "encrypted " + super.toString();
     }
@@ -342,7 +365,7 @@ public abstract class InStream extends InputStream {
     private ByteBuffer uncompressed;
     private final CompressionCodec codec;
     protected ByteBuffer compressed;
-    protected long currentOffset;
+    protected long position;
     protected DiskRangeList currentRange;
     private boolean isUncompressedOriginal;
 
@@ -354,10 +377,11 @@ public abstract class InStream extends InputStream {
      * @param length the total number of bytes in the stream
      * @param options the options used to read the stream
      */
-    public CompressedStream(String name,
+    public CompressedStream(Object name,
+                            long offset,
                             long length,
                             StreamOptions options) {
-      super(name, length);
+      super(name, offset, length);
       this.codec = options.codec;
       this.bufferSize = options.bufferSize;
     }
@@ -369,25 +393,24 @@ public abstract class InStream extends InputStream {
      * @param length the total length of the stream
      * @param options the options to read the data with
      */
-    public CompressedStream(String name,
+    public CompressedStream(Object name,
                             DiskRangeList input,
+                            long offset,
                             long length,
                             StreamOptions options) {
-      super(name, length);
+      super(name, offset, length);
       this.codec = options.codec;
       this.bufferSize = options.bufferSize;
-      reset(input, length);
+      reset(input);
     }
 
     /**
      * Reset the input to a new set of data.
      * @param input the input data
-     * @param length the number of bytes in the stream
      */
-    void reset(DiskRangeList input, long length) {
+    void reset(DiskRangeList input) {
       bytes = input;
-      this.length = length;
-      currentOffset = input == null ? 0 : input.getOffset();
+      position = input == null ? 0 : input.getOffset() - offset;
       setCurrent(input, true);
     }
 
@@ -400,7 +423,7 @@ public abstract class InStream extends InputStream {
       currentRange = newRange;
       if (newRange != null) {
         compressed = newRange.getData().slice();
-        compressed.position((int) (currentOffset - newRange.getOffset()));
+        compressed.position((int) (position + offset - newRange.getOffset()));
       }
     }
 
@@ -417,11 +440,11 @@ public abstract class InStream extends InputStream {
 
         if (chunkLength > bufferSize) {
           throw new IllegalArgumentException("Buffer size too small. size = " +
-              bufferSize + " needed = " + chunkLength);
+              bufferSize + " needed = " + chunkLength + " in " + name);
         }
         // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
         assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
-        currentOffset += OutStream.HEADER_SIZE;
+        position += OutStream.HEADER_SIZE;
 
         ByteBuffer slice = this.slice(chunkLength);
 
@@ -464,7 +487,7 @@ public abstract class InStream extends InputStream {
 
     private boolean ensureUncompressed() throws IOException {
       while (uncompressed == null || uncompressed.remaining() == 0) {
-        if (currentOffset == this.length) {
+        if (position == this.length) {
           return false;
         }
         readHeader();
@@ -485,11 +508,16 @@ public abstract class InStream extends InputStream {
       uncompressed = null;
       compressed = null;
       currentRange = null;
-      currentOffset = length;
+      position = length;
       bytes = null;
     }
 
     @Override
+    public void changeIv(Consumer<byte[]> modifier) {
+      // nothing to do
+    }
+
+    @Override
     public void seek(PositionProvider index) throws IOException {
       seek(index.getNext());
       long uncompressedBytes = index.getNext();
@@ -507,13 +535,13 @@ public abstract class InStream extends InputStream {
     private ByteBuffer slice(int chunkLength) throws IOException {
       int len = chunkLength;
       final DiskRangeList oldRange = currentRange;
-      final long oldOffset = currentOffset;
+      final long oldPosition = position;
       ByteBuffer slice;
       if (compressed.remaining() >= len) {
         slice = compressed.slice();
         // simple case
         slice.limit(len);
-        currentOffset += len;
+        position += len;
         compressed.position(compressed.position() + len);
         return slice;
       } else if (currentRange.next == null) {
@@ -531,7 +559,7 @@ public abstract class InStream extends InputStream {
       // we need to consolidate 2 or more buffers into 1
       // first copy out compressed buffers
       ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
-      currentOffset += compressed.remaining();
+      position += compressed.remaining();
       len -= compressed.remaining();
       copy.put(compressed);
 
@@ -544,18 +572,18 @@ public abstract class InStream extends InputStream {
           slice = compressed.slice();
           slice.limit(len);
           copy.put(slice);
-          currentOffset += len;
+          position += len;
           compressed.position(compressed.position() + len);
           copy.flip();
           return copy;
         }
-        currentOffset += compressed.remaining();
+        position += compressed.remaining();
         len -= compressed.remaining();
         copy.put(compressed);
       }
 
       // restore offsets for exception clarity
-      currentOffset = oldOffset;
+      position = oldPosition;
       setCurrent(oldRange, true);
       throw new IOException("EOF in " + this + " while trying to read " +
           chunkLength + " bytes");
@@ -565,11 +593,12 @@ public abstract class InStream extends InputStream {
       if (desired == 0 && bytes == null) {
         return;
       }
+      long posn = desired + offset;
       for (DiskRangeList range = bytes; range != null; range = range.next) {
-        if (range.getOffset() <= desired &&
-            (range.next == null ? desired <= range.getEnd() :
-              desired < range.getEnd())) {
-          currentOffset = desired;
+        if (range.getOffset() <= posn &&
+            (range.next == null ? posn <= range.getEnd() :
+              posn < range.getEnd())) {
+          position = desired;
           setCurrent(range, true);
           return;
         }
@@ -597,7 +626,7 @@ public abstract class InStream extends InputStream {
 
     @Override
     public String toString() {
-      return "compressed stream " + name + " position: " + currentOffset +
+      return "compressed stream " + name + " position: " + position +
           " length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
           " offset: " + (compressed == null ? 0 : compressed.position()) +
           " limit: " + (compressed == null ? 0 : compressed.limit()) +
@@ -611,13 +640,14 @@ public abstract class InStream extends InputStream {
   private static class EncryptedCompressedStream extends CompressedStream {
     private final EncryptionState encrypt;
 
-    public EncryptedCompressedStream(String name,
+    public EncryptedCompressedStream(Object name,
                                      DiskRangeList input,
+                                     long offset,
                                      long length,
                                      StreamOptions options) {
-      super(name, length, options);
-      encrypt = new EncryptionState(name, options);
-      reset(input, length);
+      super(name, offset, length, options);
+      encrypt = new EncryptionState(name, offset, options);
+      reset(input);
     }
 
     @Override
@@ -625,10 +655,10 @@ public abstract class InStream extends InputStream {
       currentRange = newRange;
       if (newRange != null) {
         if (isJump) {
-          encrypt.changeIv(newRange.getOffset());
+          encrypt.changeIv(newRange.getOffset() - offset);
         }
         compressed = encrypt.decrypt(newRange);
-        compressed.position((int) (currentOffset - newRange.getOffset()));
+        compressed.position((int) (position + offset - newRange.getOffset()));
       }
     }
 
@@ -639,6 +669,11 @@ public abstract class InStream extends InputStream {
     }
 
     @Override
+    public void changeIv(Consumer<byte[]> modifier) {
+      encrypt.changeIv(modifier);
+    }
+
+    @Override
     public String toString() {
       return "encrypted " + super.toString();
     }
@@ -653,6 +688,17 @@ public abstract class InStream extends InputStream {
     private Key key;
     private byte[] iv;
 
+    public StreamOptions(StreamOptions other) {
+      codec = other.codec;
+      bufferSize = other.bufferSize;
+      algorithm = other.algorithm;
+      key = other.key;
+      iv = other.iv == null ? null : other.iv.clone();
+    }
+
+    public StreamOptions() {
+    }
+
     public StreamOptions withCodec(CompressionCodec value) {
       this.codec = value;
       return this;
@@ -672,10 +718,30 @@ public abstract class InStream extends InputStream {
       return this;
     }
 
+    public boolean isCompressed() {
+      return codec != null;
+    }
+
     public CompressionCodec getCodec() {
       return codec;
     }
 
+    public int getBufferSize() {
+      return bufferSize;
+    }
+
+    public EncryptionAlgorithm getAlgorithm() {
+      return algorithm;
+    }
+
+    public Key getKey() {
+      return key;
+    }
+
+    public byte[] getIv() {
+      return iv;
+    }
+
     @Override
     public StreamOptions clone() {
       try {
@@ -689,6 +755,20 @@ public abstract class InStream extends InputStream {
         throw new UnsupportedOperationException("uncloneable", e);
       }
     }
+
+    @Override
+    public String toString() {
+      StringBuilder buffer = new StringBuilder();
+      buffer.append("compress: ");
+      buffer.append(codec == null ? "none" : codec.getKind());
+      buffer.append(", buffer size: ");
+      buffer.append(bufferSize);
+      if (key != null) {
+        buffer.append(", encryption: ");
+        buffer.append(algorithm);
+      }
+      return buffer.toString();
+    }
   }
 
   public static StreamOptions options() {
@@ -699,24 +779,30 @@ public abstract class InStream extends InputStream {
    * Create an input stream from a list of disk ranges with data.
    * @param name the name of the stream
    * @param input the list of ranges of bytes for the stream; from disk or cache
+   * @param offset the first byte offset of the stream
    * @param length the length in bytes of the stream
    * @param options the options to read with
    * @return an input stream
    */
-  public static InStream create(String name,
+  public static InStream create(Object name,
                                 DiskRangeList input,
+                                long offset,
                                 long length,
                                 StreamOptions options) {
+    LOG.debug("Reading {} with {} from {} for {}", name, options, offset,
+        length);
     if (options == null || options.codec == null) {
       if (options == null || options.key == null) {
-        return new UncompressedStream(name, input, length);
+        return new UncompressedStream(name, input, offset, length);
       } else {
-        return new EncryptedStream(name, input, length, options);
+        OutStream.logKeyAndIv(name, options.getKey(), options.getIv());
+        return new EncryptedStream(name, input, offset, length, options);
       }
     } else if (options.key == null) {
-      return new CompressedStream(name, input, length, options);
+      return new CompressedStream(name, input, offset, length, options);
     } else {
-      return new EncryptedCompressedStream(name, input, length, options);
+      OutStream.logKeyAndIv(name, options.getKey(), options.getIv());
+      return new EncryptedCompressedStream(name, input, offset, length, options);
     }
   }
 
@@ -727,10 +813,11 @@ public abstract class InStream extends InputStream {
    * @param length the length in bytes of the stream
    * @return an input stream
    */
-  public static InStream create(String name,
+  public static InStream create(Object name,
                                 DiskRangeList input,
-                                long length) throws IOException {
-    return create(name, input, length, null);
+                                long offset,
+                                long length) {
+    return create(name, input, offset, length, null);
   }
 
   /**
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index 8edfaee..ddd53b7 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -412,7 +412,7 @@ public class ReaderImpl implements Reader {
     bb.position(footerAbsPos);
     bb.limit(footerAbsPos + footerSize);
     return OrcProto.Footer.parseFrom(InStream.createCodedInputStream(
-        InStream.create("footer", new BufferChunk(bb, 0), footerSize, options)));
+        InStream.create("footer", new BufferChunk(bb, 0), 0, footerSize, options)));
   }
 
   public static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
@@ -420,7 +420,7 @@ public class ReaderImpl implements Reader {
     bb.position(metadataAbsPos);
     bb.limit(metadataAbsPos + metadataSize);
     return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream(
-        InStream.create("metadata", new BufferChunk(bb, 0), metadataSize, options)));
+        InStream.create("metadata", new BufferChunk(bb, 0), 0, metadataSize, options)));
   }
 
   private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index c1b9652..bf72982 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -1067,7 +1067,7 @@ public class RecordReaderImpl implements RecordReader {
           if (!(range instanceof BufferChunk)) {
             continue;
           }
-          dataReader.releaseBuffer(((BufferChunk) range).getChunk());
+          dataReader.releaseBuffer(range.getData());
         }
       }
     }
@@ -1219,7 +1219,7 @@ public class RecordReaderImpl implements RecordReader {
           ranges, streamOffset, streamDesc.getLength());
       StreamName name = new StreamName(column, streamDesc.getKind());
       streams.put(name, InStream.create(name.toString(), buffers,
-          streamDesc.getLength(), options));
+          0, streamDesc.getLength(), options));
       streamOffset += streamDesc.getLength();
     }
   }
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index d4c57e5..aeb33d4 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -234,7 +234,7 @@ public class RecordReaderUtils {
                 indexes[column] = OrcProto.RowIndex.parseFrom(
                     InStream.createCodedInputStream(InStream.create("index",
                         new BufferChunk(bb, 0),
-                        stream.getLength(), options)));
+                        0, stream.getLength(), options)));
               }
               break;
             case BLOOM_FILTER:
@@ -246,7 +246,7 @@ public class RecordReaderUtils {
                 bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom
                     (InStream.createCodedInputStream(InStream.create(
                         "bloom_filter", new BufferChunk(bb, 0),
-                        stream.getLength(), options)));
+                        0, stream.getLength(), options)));
               }
               break;
             default:
@@ -271,7 +271,7 @@ public class RecordReaderUtils {
       file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
       return OrcProto.StripeFooter.parseFrom(
           InStream.createCodedInputStream(InStream.create("footer",
-              new BufferChunk(tailBuf, 0), tailLength, options)));
+              new BufferChunk(tailBuf, 0), 0, tailLength, options)));
     }
 
     @Override
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
index 4590f9b..6f54c77 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
@@ -53,7 +53,7 @@ public class TestBitFieldReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     BitFieldReader in = new BitFieldReader(InStream.create("test",
-        new BufferChunk(inBuf, 0), inBuf.remaining(),
+        new BufferChunk(inBuf, 0), 0, inBuf.remaining(),
         InStream.options().withCodec(codec).withBufferSize(500)));
     for(int i=0; i < COUNT; ++i) {
       int x = in.next();
@@ -102,7 +102,7 @@ public class TestBitFieldReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     BitFieldReader in = new BitFieldReader(InStream.create("test",
-        new BufferChunk(inBuf, 0), inBuf.remaining()));
+        new BufferChunk(inBuf, 0), 0, inBuf.remaining()));
     for(int i=0; i < COUNT; i += 5) {
       int x = in.next();
       if (i < COUNT/2) {
@@ -139,7 +139,7 @@ public class TestBitFieldReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     BitFieldReader in = new BitFieldReader(InStream.create("test",
-        new BufferChunk(inBuf, 0), inBuf.remaining()));
+        new BufferChunk(inBuf, 0), 0, inBuf.remaining()));
     in.seek(posn);
     in.skip(10);
     for(int r = 210; r < COUNT; ++r) {
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitPack.java b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
index d298ecc..a26b130 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitPack.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
@@ -110,7 +110,7 @@ public class TestBitPack {
     inBuf.flip();
     long[] buff = new long[SIZE];
     utils.readInts(buff, 0, SIZE, fixedWidth,
-        InStream.create("test", new BufferChunk(inBuf,0),
+        InStream.create("test", new BufferChunk(inBuf,0), 0,
         inBuf.remaining()));
     for (int i = 0; i < SIZE; i++) {
       buff[i] = utils.zigzagDecode(buff[i]);
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index 1881be3..3b6e28c 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -108,7 +108,7 @@ public class TestInStream {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
-        inBuf.remaining());
+        0, inBuf.remaining());
     assertEquals("uncompressed stream test position: 0 length: 1024" +
                  " range: 0 offset: 0 limit: 1024",
                  in.toString());
@@ -161,7 +161,7 @@ public class TestInStream {
       offset += size;
     }
 
-    InStream in = InStream.create("test", list.get(), collect.buffer.size(),
+    InStream in = InStream.create("test", list.get(), 0, collect.buffer.size(),
         InStream.options().withEncryption(algorithm, decryptKey,
             writerOptions.getIv()));
     assertEquals("encrypted uncompressed stream test position: 0 length: 8192" +
@@ -219,7 +219,7 @@ public class TestInStream {
       offset += size;
     }
 
-    InStream in = InStream.create("test", list.get(), collect.buffer.size(),
+    InStream in = InStream.create("test", list.get(), 0, collect.buffer.size(),
         InStream.options()
             .withCodec(new ZlibCodec()).withBufferSize(500)
             .withEncryption(algorithm, decryptKey, writerOptions.getIv()));
@@ -258,7 +258,7 @@ public class TestInStream {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
+    InStream in = InStream.create("test", new BufferChunk(inBuf, 0), 0,
         inBuf.remaining(),
         InStream.options().withCodec(codec).withBufferSize(300));
     assertEquals("compressed stream test position: 0 length: 961 range: 0" +
@@ -294,7 +294,7 @@ public class TestInStream {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
+    InStream in = InStream.create("test", new BufferChunk(inBuf, 0), 0,
         inBuf.remaining(),
         InStream.options().withCodec(codec).withBufferSize(100));
     byte[] contents = new byte[1024];
@@ -310,7 +310,7 @@ public class TestInStream {
     inBuf.put((byte) 32);
     inBuf.put((byte) 0);
     inBuf.flip();
-    in = InStream.create("test2", new BufferChunk(inBuf, 0),
+    in = InStream.create("test2", new BufferChunk(inBuf, 0), 0,
         inBuf.remaining(),
         InStream.options().withCodec(codec).withBufferSize(300));
     try {
@@ -354,7 +354,7 @@ public class TestInStream {
     }
     InStream.StreamOptions inOptions = InStream.options()
         .withCodec(codec).withBufferSize(400);
-    InStream in = InStream.create("test", buffers.get(), 1674, inOptions);
+    InStream in = InStream.create("test", buffers.get(), 0, 1674, inOptions);
     assertEquals("compressed stream test position: 0 length: 1674 range: 0" +
                  " offset: 0 limit: 483 range 0 = 0 to 483;" +
                  "  range 1 = 483 to 1625;  range 2 = 1625 to 1674",
@@ -373,7 +373,7 @@ public class TestInStream {
     buffers.clear();
     buffers.add(new BufferChunk(inBuf[1], 483));
     buffers.add(new BufferChunk(inBuf[2], 1625));
-    in = InStream.create("test", buffers.get(), 1674, inOptions);
+    in = InStream.create("test", buffers.get(), 0, 1674, inOptions);
     inStream = new DataInputStream(in);
     positions[303].reset();
     in.seek(positions[303]);
@@ -384,7 +384,7 @@ public class TestInStream {
     buffers.clear();
     buffers.add(new BufferChunk(inBuf[0], 0));
     buffers.add(new BufferChunk(inBuf[2], 1625));
-    in = InStream.create("test", buffers.get(), 1674, inOptions);
+    in = InStream.create("test", buffers.get(), 0, 1674, inOptions);
     inStream = new DataInputStream(in);
     positions[1001].reset();
     for(int i=0; i < 300; ++i) {
@@ -424,7 +424,7 @@ public class TestInStream {
     buffers.add(new BufferChunk(inBuf[0], 0));
     buffers.add(new BufferChunk(inBuf[1], 1024));
     buffers.add(new BufferChunk(inBuf[2], 3072));
-    InStream in = InStream.create("test", buffers.get(), 4096);
+    InStream in = InStream.create("test", buffers.get(), 0, 4096);
     assertEquals("uncompressed stream test position: 0 length: 4096" +
                  " range: 0 offset: 0 limit: 1024",
                  in.toString());
@@ -442,7 +442,7 @@ public class TestInStream {
     buffers.clear();
     buffers.add(new BufferChunk(inBuf[1], 1024));
     buffers.add(new BufferChunk(inBuf[2], 3072));
-    in = InStream.create("test", buffers.get(), 4096);
+    in = InStream.create("test", buffers.get(), 0, 4096);
     inStream = new DataInputStream(in);
     positions[256].reset();
     in.seek(positions[256]);
@@ -453,7 +453,7 @@ public class TestInStream {
     buffers.clear();
     buffers.add(new BufferChunk(inBuf[0], 0));
     buffers.add(new BufferChunk(inBuf[2], 3072));
-    in = InStream.create("test", buffers.get(), 4096);
+    in = InStream.create("test", buffers.get(), 0, 4096);
     inStream = new DataInputStream(in);
     positions[768].reset();
     for(int i=0; i < 256; ++i) {
@@ -468,7 +468,7 @@ public class TestInStream {
   @Test
   public void testEmptyDiskRange() throws IOException {
     DiskRangeList range = new BufferChunk(ByteBuffer.allocate(0), 0);
-    InStream stream = new InStream.UncompressedStream("test", range, 0);
+    InStream stream = new InStream.UncompressedStream("test", range, 0, 0);
     assertEquals(0, stream.available());
     stream.seek(new PositionProvider() {
       @Override
diff --git a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
index 178e768..3f66ace 100644
--- a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
@@ -61,7 +61,7 @@ public class TestIntegerCompressionReader {
     inBuf.flip();
     RunLengthIntegerReaderV2 in =
       new RunLengthIntegerReaderV2(InStream.create("test",
-          new BufferChunk(inBuf, 0), inBuf.remaining(),
+          new BufferChunk(inBuf, 0), 0, inBuf.remaining(),
           InStream.options().withCodec(codec).withBufferSize(1000)), true, false);
     for(int i=0; i < 2048; ++i) {
       int x = (int) in.next();
@@ -114,7 +114,7 @@ public class TestIntegerCompressionReader {
     inBuf.flip();
     RunLengthIntegerReaderV2 in =
       new RunLengthIntegerReaderV2(InStream.create("test",
-                                                   new BufferChunk(inBuf, 0),
+                                                   new BufferChunk(inBuf, 0), 0,
                                                    inBuf.remaining()), true, false);
     for(int i=0; i < 2048; i += 10) {
       int x = (int) in.next();
diff --git a/java/core/src/test/org/apache/orc/impl/TestOutStream.java b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
index b905c7e..95dedf6 100644
--- a/java/core/src/test/org/apache/orc/impl/TestOutStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
@@ -20,6 +20,7 @@ package org.apache.orc.impl;
 
 import org.apache.orc.CompressionCodec;
 import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.InMemoryKeystore;
 import org.apache.orc.OrcProto;
 import org.apache.orc.PhysicalWriter;
 import org.apache.orc.impl.writer.StreamOptions;
@@ -35,22 +36,12 @@ import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.Key;
-import java.security.NoSuchAlgorithmException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class TestOutStream {
 
-  public static final boolean TEST_AES_256;
-  static {
-    try {
-      TEST_AES_256 = Cipher.getMaxAllowedKeyLength("AES") != 128;
-    } catch (NoSuchAlgorithmException e) {
-      throw new IllegalArgumentException("Unknown algorithm", e);
-    }
-  }
-
   @Test
   public void testFlush() throws Exception {
     PhysicalWriter.OutputReceiver receiver =
@@ -180,7 +171,7 @@ public class TestOutStream {
   @Test
   public void testCompression256Encryption() throws Exception {
     // disable test if AES_256 is not available
-    Assume.assumeTrue(TEST_AES_256);
+    Assume.assumeTrue(InMemoryKeystore.SUPPORTS_AES_256);
     TestInStream.OutputCollector receiver = new TestInStream.OutputCollector();
     EncryptionAlgorithm aes256 = EncryptionAlgorithm.AES_CTR_256;
     byte[] keyBytes = new byte[aes256.keyLength()];
@@ -211,7 +202,7 @@ public class TestOutStream {
     // use InStream to decompress it
     BufferChunkList ranges = new BufferChunkList();
     ranges.add(new BufferChunk(ByteBuffer.wrap(compressed), 0));
-    InStream decompressedStream = InStream.create("test", ranges.get(),
+    InStream decompressedStream = InStream.create("test", ranges.get(), 0,
         compressed.length,
         InStream.options().withCodec(new ZlibCodec()).withBufferSize(1024));
 
diff --git a/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java b/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java
index 6d2d298..333bc98 100644
--- a/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java
+++ b/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java
@@ -57,7 +57,7 @@ public class TestPhysicalFsWriter {
     }
 
     @Override
-    public void write(int b) throws IOException {
+    public void write(int b) {
       contents.add(new byte[]{(byte) b});
     }
 
@@ -97,12 +97,12 @@ public class TestPhysicalFsWriter {
 
     @Override
     public FSDataOutputStream append(Path f, int bufferSize,
-                                     Progressable progress) throws IOException {
+                                     Progressable progress) {
       throw new UnsupportedOperationException("append not supported");
     }
 
     @Override
-    public boolean rename(Path src, Path dst) throws IOException {
+    public boolean rename(Path src, Path dst) {
       boolean result = fileContents.containsKey(src) &&
           !fileContents.containsKey(dst);
       if (result) {
@@ -113,14 +113,14 @@ public class TestPhysicalFsWriter {
     }
 
     @Override
-    public boolean delete(Path f, boolean recursive) throws IOException {
+    public boolean delete(Path f, boolean recursive) {
       boolean result = fileContents.containsKey(f);
       fileContents.remove(f);
       return result;
     }
 
     @Override
-    public FileStatus[] listStatus(Path f) throws IOException {
+    public FileStatus[] listStatus(Path f) {
       return new FileStatus[]{getFileStatus(f)};
     }
 
@@ -135,12 +135,12 @@ public class TestPhysicalFsWriter {
     }
 
     @Override
-    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    public boolean mkdirs(Path f, FsPermission permission) {
       return false;
     }
 
     @Override
-    public FileStatus getFileStatus(Path f) throws IOException {
+    public FileStatus getFileStatus(Path f) {
       List<byte[]> contents = fileContents.get(f);
       if (contents != null) {
         long sum = 0;
@@ -262,7 +262,8 @@ public class TestPhysicalFsWriter {
     }
 
     @Override
-    public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException {
+    public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+                                                ByteBufferPoolShim pool) {
       return null;
     }
 
@@ -276,7 +277,7 @@ public class TestPhysicalFsWriter {
     }
 
     @Override
-    public KeyProvider getKeyProvider(Configuration conf, Random random) throws IOException {
+    public KeyProvider getKeyProvider(Configuration conf, Random random) {
       return null;
     }
   }
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
index 6887f26..cc81dbc 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
@@ -48,7 +48,7 @@ public class TestRunLengthByteReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
-        new BufferChunk(inBuf, 0), inBuf.remaining()));
+        new BufferChunk(inBuf, 0), 0, inBuf.remaining()));
     for(int i=0; i < 2048; ++i) {
       int x = in.next() & 0xff;
       if (i < 1024) {
@@ -92,7 +92,7 @@ public class TestRunLengthByteReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
-        new BufferChunk(inBuf, 0), inBuf.remaining(),
+        new BufferChunk(inBuf, 0), 0, inBuf.remaining(),
         InStream.options().withCodec(codec).withBufferSize(500)));
     for(int i=0; i < 2048; ++i) {
       int x = in.next() & 0xff;
@@ -130,7 +130,7 @@ public class TestRunLengthByteReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
-        new BufferChunk(inBuf, 0), inBuf.remaining()));
+        new BufferChunk(inBuf, 0), 0, inBuf.remaining()));
     for(int i=0; i < 2048; i += 10) {
       int x = in.next() & 0xff;
       if (i < 1024) {
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
index 6b29152..ac41e6e 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
@@ -17,8 +17,6 @@
  */
 package org.apache.orc.impl;
 
-import static junit.framework.Assert.assertEquals;
-
 import java.nio.ByteBuffer;
 import java.util.Random;
 
@@ -26,6 +24,8 @@ import org.apache.orc.CompressionCodec;
 import org.apache.orc.impl.writer.StreamOptions;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class TestRunLengthIntegerReader {
 
   public void runSeekTest(CompressionCodec codec) throws Exception {
@@ -60,7 +60,7 @@ public class TestRunLengthIntegerReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     RunLengthIntegerReader in = new RunLengthIntegerReader(InStream.create
-        ("test", new BufferChunk(inBuf, 0), inBuf.remaining(),
+        ("test", new BufferChunk(inBuf, 0), 0, inBuf.remaining(),
             InStream.options().withCodec(codec).withBufferSize(1000)), true);
     for(int i=0; i < 2048; ++i) {
       int x = (int) in.next();
@@ -112,7 +112,7 @@ public class TestRunLengthIntegerReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     RunLengthIntegerReader in = new RunLengthIntegerReader(InStream.create
-        ("test", new BufferChunk(inBuf, 0), inBuf.remaining()), true);
+        ("test", new BufferChunk(inBuf, 0), 0, inBuf.remaining()), true);
     for(int i=0; i < 2048; i += 10) {
       int x = (int) in.next();
       if (i < 1024) {
diff --git a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
index 1102d12..87478d4 100644
--- a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
+++ b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
@@ -23,10 +23,8 @@ import static org.junit.Assert.*;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
@@ -47,7 +45,6 @@ import org.apache.orc.RecordReader;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -1581,7 +1578,17 @@ public class TestSchemaEvolution {
       buffer[i] = (byte) values[i];
     }
     ranges.add(new BufferChunk(ByteBuffer.wrap(buffer), 0));
-    streams.put(name, InStream.create(name.toString(), ranges.get(), values.length));
+    streams.put(name, InStream.create(name.toString(), ranges.get(), 0,
+        values.length));
+  }
+
+  static ByteBuffer createBuffer(int... values) {
+    ByteBuffer result = ByteBuffer.allocate(values.length);
+    for(int v: values) {
+      result.put((byte) v);
+    }
+    result.flip();
+    return result;
   }
 
   @Test