You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2018/10/25 11:42:19 UTC

[orc] branch master updated: ORC-421: Separate compression options from the CompressionCodec.

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new b40f6d6  ORC-421: Separate compression options from the CompressionCodec.
b40f6d6 is described below

commit b40f6d682e0d521f3c46fe65b5c9a2934a4a05ab
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Fri Oct 19 20:45:10 2018 -0700

    ORC-421: Separate compression options from the CompressionCodec.
    
    Fixes #327
    
    Signed-off-by: Owen O'Malley <om...@apache.org>
---
 .../src/java/org/apache/orc/CompressionCodec.java  |  38 ++++----
 .../src/java/org/apache/orc/PhysicalWriter.java    |   6 +-
 .../org/apache/orc/impl/AircompressorCodec.java    |  24 ++++-
 .../src/java/org/apache/orc/impl/OutStream.java    |  11 +--
 .../java/org/apache/orc/impl/PhysicalFsWriter.java |  33 +++++--
 .../src/java/org/apache/orc/impl/WriterImpl.java   |  78 ++++++++--------
 .../src/java/org/apache/orc/impl/ZlibCodec.java    | 104 +++++++++++----------
 .../org/apache/orc/impl/writer/StreamOptions.java  |   9 +-
 .../test/org/apache/orc/TestStringDictionary.java  |   3 +-
 .../org/apache/orc/impl/TestBitFieldReader.java    |  17 ++--
 .../src/test/org/apache/orc/impl/TestBitPack.java  |   3 +-
 .../src/test/org/apache/orc/impl/TestInStream.java |  27 ++++--
 .../orc/impl/TestIntegerCompressionReader.java     |   9 +-
 .../test/org/apache/orc/impl/TestOutStream.java    |   7 +-
 .../apache/orc/impl/TestRunLengthByteReader.java   |  21 +++--
 .../orc/impl/TestRunLengthIntegerReader.java       |   9 +-
 .../src/test/org/apache/orc/impl/TestZlib.java     |   3 +-
 .../src/test/org/apache/orc/impl/TestRLEv2.java    |   7 +-
 18 files changed, 238 insertions(+), 171 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/CompressionCodec.java b/java/core/src/java/org/apache/orc/CompressionCodec.java
index 1d2af57..b45a711 100644
--- a/java/core/src/java/org/apache/orc/CompressionCodec.java
+++ b/java/core/src/java/org/apache/orc/CompressionCodec.java
@@ -23,26 +23,40 @@ import java.util.EnumSet;
 
 public interface CompressionCodec {
 
-  enum Modifier {
+  enum SpeedModifier {
     /* speed/compression tradeoffs */
     FASTEST,
     FAST,
-    DEFAULT,
-    /* data sensitivity modifiers */
+    DEFAULT
+  }
+
+  enum DataKind {
     TEXT,
     BINARY
-  };
+  }
+
+  interface Options {
+    Options setSpeed(SpeedModifier newValue);
+    Options setData(DataKind newValue);
+  }
+
+  /**
+   * Create an instance of the default options for this codec.
+   * @return a new options object
+   */
+  Options createOptions();
 
   /**
    * Compress the in buffer to the out buffer.
    * @param in the bytes to compress
    * @param out the uncompressed bytes
    * @param overflow put any additional bytes here
+   * @param options the options to control compression
    * @return true if the output is smaller than input
    * @throws IOException
    */
-  boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow
-                  ) throws IOException;
+  boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow,
+                   Options options) throws IOException;
 
   /**
    * Decompress the in buffer to the out buffer.
@@ -52,18 +66,6 @@ public interface CompressionCodec {
    */
   void decompress(ByteBuffer in, ByteBuffer out) throws IOException;
 
-  /**
-   * Produce a modified compression codec if the underlying algorithm allows
-   * modification.
-   *
-   * This does not modify the current object, but returns a new object if
-   * modifications are possible. Returns the same object if no modifications
-   * are possible.
-   * @param modifiers compression modifiers (nullable)
-   * @return codec for use after optional modification
-   */
-  CompressionCodec modify(EnumSet<Modifier> modifiers);
-
   /** Resets the codec, preparing it for reuse. */
   void reset();
 
diff --git a/java/core/src/java/org/apache/orc/PhysicalWriter.java b/java/core/src/java/org/apache/orc/PhysicalWriter.java
index 051688b..e25e81c 100644
--- a/java/core/src/java/org/apache/orc/PhysicalWriter.java
+++ b/java/core/src/java/org/apache/orc/PhysicalWriter.java
@@ -68,8 +68,7 @@ public interface PhysicalWriter {
    * @param codec the compression codec to use
    */
   void writeIndex(StreamName name,
-                  OrcProto.RowIndex.Builder index,
-                  CompressionCodec codec) throws IOException;
+                  OrcProto.RowIndex.Builder index) throws IOException;
 
   /**
    * Write a bloom filter index in the given stream name.
@@ -78,8 +77,7 @@ public interface PhysicalWriter {
    * @param codec the compression codec to use
    */
   void writeBloomFilter(StreamName name,
-                        OrcProto.BloomFilterIndex.Builder bloom,
-                        CompressionCodec codec) throws IOException;
+                        OrcProto.BloomFilterIndex.Builder bloom) throws IOException;
 
   /**
    * Flushes the data in all the streams, spills them to disk, write out stripe
diff --git a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
index 2609d26..1f52d85 100644
--- a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
@@ -25,7 +25,6 @@ import org.apache.orc.CompressionKind;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.EnumSet;
 
 public class AircompressorCodec implements CompressionCodec {
   private final CompressionKind kind;
@@ -59,7 +58,8 @@ public class AircompressorCodec implements CompressionCodec {
 
   @Override
   public boolean compress(ByteBuffer in, ByteBuffer out,
-                          ByteBuffer overflow) throws IOException {
+                          ByteBuffer overflow,
+                          Options options) {
     int inBytes = in.remaining();
     // I should work on a patch for Snappy to support an overflow buffer
     // to prevent the extra buffer copy.
@@ -98,10 +98,24 @@ public class AircompressorCodec implements CompressionCodec {
     out.flip();
   }
 
+  /**
+   * Return an options object that doesn't do anything
+   * @return a new options object
+   */
   @Override
-  public CompressionCodec modify(EnumSet<Modifier> modifiers) {
-    // snappy allows no modifications
-    return this;
+  public Options createOptions() {
+    return new Options() {
+
+      @Override
+      public Options setSpeed(SpeedModifier newValue) {
+        return this;
+      }
+
+      @Override
+      public Options setData(DataKind newValue) {
+        return this;
+      }
+    };
   }
 
   @Override
diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java
index 435f43c..f79999b 100644
--- a/java/core/src/java/org/apache/orc/impl/OutStream.java
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -66,24 +66,19 @@ public class OutStream extends PositionedOutputStream {
   private ByteBuffer overflow = null;
   private final int bufferSize;
   private final CompressionCodec codec;
+  private final CompressionCodec.Options options;
   private long compressedBytes = 0;
   private long uncompressedBytes = 0;
   private final Cipher cipher;
   private final Key key;
 
   public OutStream(String name,
-                   int bufferSize,
-                   CompressionCodec codec,
-                   PhysicalWriter.OutputReceiver receiver) {
-    this(name, new StreamOptions(bufferSize).withCodec(codec), receiver);
-  }
-
-  public OutStream(String name,
                    StreamOptions options,
                    PhysicalWriter.OutputReceiver receiver) {
     this.name = name;
     this.bufferSize = options.getBufferSize();
     this.codec = options.getCodec();
+    this.options = options.getCodecOptions();
     this.receiver = receiver;
     if (options.isEncrypted()) {
       this.cipher = options.getAlgorithm().createCipher();
@@ -258,7 +253,7 @@ public class OutStream extends PositionedOutputStream {
       }
       int sizePosn = compressed.position();
       compressed.position(compressed.position() + HEADER_SIZE);
-      if (codec.compress(current, compressed, overflow)) {
+      if (codec.compress(current, compressed, overflow, options)) {
         uncompressedBytes = 0;
         // move position back to after the header
         current.position(HEADER_SIZE);
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index 7a2be5b..c344878 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -36,6 +36,7 @@ import org.apache.orc.CompressionKind;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
 import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.writer.StreamOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +58,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
   private final int bufferSize;
   private final int maxPadding;
   private final CompressionKind compress;
+  private final OrcFile.CompressionStrategy compressionStrategy;
   private CompressionCodec codec;
   private final boolean addBlockPadding;
   private final boolean writeVariableLengthBlocks;
@@ -87,6 +89,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
           opts.getBufferSize());
     }
     this.compress = opts.getCompress();
+    this.compressionStrategy = opts.getCompressionStrategy();
     this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
     this.blockSize = opts.getBlockSize();
     LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
@@ -96,7 +99,11 @@ public class PhysicalFsWriter implements PhysicalWriter {
         fs.getDefaultReplication(path), blockSize);
     blockOffset = 0;
     codec = OrcCodecPool.getCodec(compress);
-    writer = new OutStream("metadata", bufferSize, codec,
+    StreamOptions options = new StreamOptions(bufferSize);
+    if (codec != null) {
+      options.withCodec(codec, codec.createOptions());
+    }
+    writer = new OutStream("metadata", options,
         new DirectStream(rawWriter));
     protobufWriter = CodedOutputStream.newInstance(writer);
     writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
@@ -392,22 +399,30 @@ public class PhysicalFsWriter implements PhysicalWriter {
     return result;
   }
 
+  StreamOptions getOptions(OrcProto.Stream.Kind kind) {
+    StreamOptions options = new StreamOptions(bufferSize);
+    if (codec != null) {
+      options.withCodec(codec, WriterImpl.getCustomizedCodec(codec,
+          compressionStrategy, kind));
+    }
+    return options;
+  }
+
   @Override
   public void writeIndex(StreamName name,
-                         OrcProto.RowIndex.Builder index,
-                         CompressionCodec codec) throws IOException {
-    OutputStream stream = new OutStream(path.toString(), bufferSize, codec,
-        createDataStream(name));
+                         OrcProto.RowIndex.Builder index) throws IOException {
+    OutputStream stream = new OutStream(path.toString(),
+        getOptions(name.getKind()), createDataStream(name));
     index.build().writeTo(stream);
     stream.flush();
   }
 
   @Override
   public void writeBloomFilter(StreamName name,
-                               OrcProto.BloomFilterIndex.Builder bloom,
-                               CompressionCodec codec) throws IOException {
-    OutputStream stream = new OutStream(path.toString(), bufferSize, codec,
-        createDataStream(name));
+                               OrcProto.BloomFilterIndex.Builder bloom
+                               ) throws IOException {
+    OutputStream stream = new OutStream(path.toString(),
+        getOptions(name.getKind()), createDataStream(name));
     bloom.build().writeTo(stream);
     stream.flush();
   }
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index ac071bc..dc621ed 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -43,6 +43,7 @@ import org.apache.orc.PhysicalWriter;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
+import org.apache.orc.impl.writer.StreamOptions;
 import org.apache.orc.impl.writer.TreeWriter;
 import org.apache.orc.impl.writer.WriterContext;
 import org.slf4j.Logger;
@@ -270,37 +271,35 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
   }
 
 
-  CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
-    // TODO: modify may create a new codec here. We want to end() it when the stream is closed,
-    //       but at this point there's no close() for the stream.
-    CompressionCodec result = physicalWriter.getCompressionCodec();
-    if (result != null) {
-      switch (kind) {
-        case BLOOM_FILTER:
-        case DATA:
-        case DICTIONARY_DATA:
-        case BLOOM_FILTER_UTF8:
-          if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
-            result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
-                CompressionCodec.Modifier.TEXT));
-          } else {
-            result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
-                CompressionCodec.Modifier.TEXT));
-          }
-          break;
-        case LENGTH:
-        case DICTIONARY_COUNT:
-        case PRESENT:
-        case ROW_INDEX:
-        case SECONDARY:
-          // easily compressed using the fastest modes
-          result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
-              CompressionCodec.Modifier.BINARY));
-          break;
-        default:
-          LOG.info("Missing ORC compression modifiers for " + kind);
-          break;
-      }
+  public static
+  CompressionCodec.Options getCustomizedCodec(CompressionCodec codec,
+                                              OrcFile.CompressionStrategy strategy,
+                                              OrcProto.Stream.Kind kind) {
+    CompressionCodec.Options result = codec.createOptions();
+    switch (kind) {
+      case BLOOM_FILTER:
+      case DATA:
+      case DICTIONARY_DATA:
+      case BLOOM_FILTER_UTF8:
+        result.setData(CompressionCodec.DataKind.TEXT);
+        if (strategy == OrcFile.CompressionStrategy.SPEED) {
+          result.setSpeed(CompressionCodec.SpeedModifier.FAST);
+        } else {
+          result.setSpeed(CompressionCodec.SpeedModifier.DEFAULT);
+        }
+        break;
+      case LENGTH:
+      case DICTIONARY_COUNT:
+      case PRESENT:
+      case ROW_INDEX:
+      case SECONDARY:
+        // easily compressed using the fastest modes
+        result.setSpeed(CompressionCodec.SpeedModifier.FASTEST)
+            .setData(CompressionCodec.DataKind.BINARY);
+        break;
+      default:
+        LOG.info("Missing ORC compression modifiers for " + kind);
+        break;
     }
     return result;
   }
@@ -320,10 +319,14 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
                                   OrcProto.Stream.Kind kind
                                   ) throws IOException {
       final StreamName name = new StreamName(column, kind);
-      CompressionCodec codec = getCustomizedCodec(kind);
-
-      return new OutStream(physicalWriter.toString(), bufferSize, codec,
-          physicalWriter.createDataStream(name));
+      CompressionCodec codec = physicalWriter.getCompressionCodec();
+      StreamOptions options = new StreamOptions(bufferSize);
+      if (codec != null) {
+        options.withCodec(codec, getCustomizedCodec(codec, compressionStrategy,
+            kind));
+      }
+      return new OutStream(physicalWriter.toString(),
+          options, physicalWriter.createDataStream(name));
     }
 
     /**
@@ -404,14 +407,13 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
 
     public void writeIndex(StreamName name,
                            OrcProto.RowIndex.Builder index) throws IOException {
-      physicalWriter.writeIndex(name, index, getCustomizedCodec(name.getKind()));
+      physicalWriter.writeIndex(name, index);
     }
 
     public void writeBloomFilter(StreamName name,
                                  OrcProto.BloomFilterIndex.Builder bloom
                                  ) throws IOException {
-      physicalWriter.writeBloomFilter(name, bloom,
-          getCustomizedCodec(name.getKind()));
+      physicalWriter.writeBloomFilter(name, bloom);
     }
 
     public boolean getUseUTCTimestamp() {
diff --git a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
index 19a3728..24553b9 100644
--- a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
@@ -33,27 +33,69 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
   private HadoopShims.DirectDecompressor decompressShim = null;
   private Boolean direct = null;
 
-  private int level;
-  private int strategy;
+  static class ZlibOptions implements Options {
+    private int level;
+    private int strategy;
 
-  public ZlibCodec() {
-    level = Deflater.DEFAULT_COMPRESSION;
-    strategy = Deflater.DEFAULT_STRATEGY;
+    ZlibOptions(int level, int strategy) {
+      this.level = level;
+      this.strategy = strategy;
+    }
+
+    @Override
+    public ZlibOptions setSpeed(SpeedModifier newValue) {
+      switch (newValue) {
+        case FAST:
+          // deflate_fast looking for 16 byte patterns
+          level = Deflater.BEST_SPEED + 1;
+          break;
+        case DEFAULT:
+          // deflate_slow looking for 128 byte patterns
+          level = Deflater.DEFAULT_COMPRESSION;
+          break;
+        case FASTEST:
+          // deflate_fast looking for 8 byte patterns
+          level = Deflater.BEST_SPEED;
+          break;
+        default:
+          break;
+      }
+      return this;
+    }
+
+    @Override
+    public ZlibOptions setData(DataKind newValue) {
+      switch (newValue) {
+        case BINARY:
+          /* filtered == less LZ77, more huffman */
+          strategy = Deflater.FILTERED;
+          break;
+        case TEXT:
+          strategy = Deflater.DEFAULT_STRATEGY;
+          break;
+        default:
+          break;
+      }
+      return this;
+    }
   }
 
-  private ZlibCodec(int level, int strategy) {
-    this.level = level;
-    this.strategy = strategy;
+  @Override
+  public Options createOptions() {
+    return new ZlibOptions(Deflater.DEFAULT_COMPRESSION,
+        Deflater.DEFAULT_STRATEGY);
   }
 
   @Override
   public boolean compress(ByteBuffer in, ByteBuffer out,
-                          ByteBuffer overflow) throws IOException {
+                          ByteBuffer overflow,
+                          Options options) {
+    ZlibOptions zlo = (ZlibOptions) options;
     int length = in.remaining();
     int outSize = 0;
-    Deflater deflater = new Deflater(level, true);
+    Deflater deflater = new Deflater(zlo.level, true);
     try {
-      deflater.setStrategy(strategy);
+      deflater.setStrategy(zlo.strategy);
       deflater.setInput(in.array(), in.arrayOffset() + in.position(), length);
       deflater.finish();
       int offset = out.arrayOffset() + out.position();
@@ -136,47 +178,7 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
   }
 
   @Override
-  public CompressionCodec modify(/* @Nullable */ EnumSet<Modifier> modifiers) {
-
-    if (modifiers == null) {
-      return this;
-    }
-
-    int l = this.level;
-    int s = this.strategy;
-
-    for (Modifier m : modifiers) {
-      switch (m) {
-      case BINARY:
-        /* filtered == less LZ77, more huffman */
-        s = Deflater.FILTERED;
-        break;
-      case TEXT:
-        s = Deflater.DEFAULT_STRATEGY;
-        break;
-      case FASTEST:
-        // deflate_fast looking for 8 byte patterns
-        l = Deflater.BEST_SPEED;
-        break;
-      case FAST:
-        // deflate_fast looking for 16 byte patterns
-        l = Deflater.BEST_SPEED + 1;
-        break;
-      case DEFAULT:
-        // deflate_slow looking for 128 byte patterns
-        l = Deflater.DEFAULT_COMPRESSION;
-        break;
-      default:
-        break;
-      }
-    }
-    return new ZlibCodec(l, s);
-  }
-
-  @Override
   public void reset() {
-    level = Deflater.DEFAULT_COMPRESSION;
-    strategy = Deflater.DEFAULT_STRATEGY;
     if (decompressShim != null) {
       decompressShim.reset();
     }
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java b/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
index 3d0c48a..4d800ac 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
@@ -27,6 +27,7 @@ import java.security.Key;
  */
 public class StreamOptions {
   private CompressionCodec codec;
+  private CompressionCodec.Options options;
   private final int bufferSize;
   private EncryptionAlgorithm algorithm;
   private Key key;
@@ -45,8 +46,10 @@ public class StreamOptions {
    * @param codec the codec to compress with
    * @return this
    */
-  public StreamOptions withCodec(CompressionCodec codec) {
+  public StreamOptions withCodec(CompressionCodec codec,
+                                 CompressionCodec.Options options) {
     this.codec = codec;
+    this.options = options;
     return this;
   }
 
@@ -63,6 +66,10 @@ public class StreamOptions {
     return codec;
   }
 
+  public CompressionCodec.Options getCodecOptions() {
+    return options;
+  }
+
   public int getBufferSize() {
     return bufferSize;
   }
diff --git a/java/core/src/test/org/apache/orc/TestStringDictionary.java b/java/core/src/test/org/apache/orc/TestStringDictionary.java
index cc4f8d8..27965fe 100644
--- a/java/core/src/test/org/apache/orc/TestStringDictionary.java
+++ b/java/core/src/test/org/apache/orc/TestStringDictionary.java
@@ -36,6 +36,7 @@ import org.apache.orc.impl.OutStream;
 import org.apache.orc.impl.RecordReaderImpl;
 import org.apache.orc.impl.StreamName;
 import org.apache.orc.impl.TestInStream;
+import org.apache.orc.impl.writer.StreamOptions;
 import org.apache.orc.impl.writer.StringTreeWriter;
 import org.apache.orc.impl.writer.TreeWriter;
 import org.apache.orc.impl.writer.WriterContext;
@@ -174,7 +175,7 @@ public class TestStringDictionary {
     public OutStream createStream(int column, OrcProto.Stream.Kind kind) throws IOException {
       TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
       streams.put(new StreamName(column, kind), collect);
-      return new OutStream("test", 1000, null, collect);
+      return new OutStream("test", new StreamOptions(1000), collect);
     }
 
     @Override
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
index 03c379e..25483ca 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
@@ -17,11 +17,12 @@
  */
 package org.apache.orc.impl;
 
-import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
 
 import java.nio.ByteBuffer;
 
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.impl.writer.StreamOptions;
 import org.junit.Test;
 
 public class TestBitFieldReader {
@@ -29,8 +30,12 @@ public class TestBitFieldReader {
   public void runSeekTest(CompressionCodec codec) throws Exception {
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
     final int COUNT = 16384;
+    StreamOptions options = new StreamOptions(500);
+    if (codec != null) {
+      options.withCodec(codec, codec.createOptions());
+    }
     BitFieldWriter out = new BitFieldWriter(
-        new OutStream("test", 500, codec, collect), 1);
+        new OutStream("test", options, collect), 1);
     TestInStream.PositionCollector[] positions =
         new TestInStream.PositionCollector[COUNT];
     for(int i=0; i < COUNT; ++i) {
@@ -83,7 +88,7 @@ public class TestBitFieldReader {
   public void testSkips() throws Exception {
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
     BitFieldWriter out = new BitFieldWriter(
-        new OutStream("test", 100, null, collect), 1);
+        new OutStream("test", new StreamOptions(100), collect), 1);
     final int COUNT = 16384;
     for(int i=0; i < COUNT; ++i) {
       if (i < COUNT/2) {
@@ -99,7 +104,7 @@ public class TestBitFieldReader {
     BitFieldReader in = new BitFieldReader(InStream.create("test",
         new BufferChunk(inBuf, 0), inBuf.remaining()));
     for(int i=0; i < COUNT; i += 5) {
-      int x = (int) in.next();
+      int x = in.next();
       if (i < COUNT/2) {
         assertEquals(i & 1, x);
       } else {
@@ -116,7 +121,7 @@ public class TestBitFieldReader {
   public void testSeekSkip() throws Exception {
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
     BitFieldWriter out = new BitFieldWriter(
-        new OutStream("test", 100, null, collect), 1);
+        new OutStream("test", new StreamOptions(100), collect), 1);
     final int COUNT = 256;
     TestInStream.PositionCollector posn = new TestInStream.PositionCollector();
     for(int i=0; i < COUNT; ++i) {
@@ -138,7 +143,7 @@ public class TestBitFieldReader {
     in.seek(posn);
     in.skip(10);
     for(int r = 210; r < COUNT; ++r) {
-      int x = (int) in.next();
+      int x = in.next();
       if (r < COUNT/2) {
         assertEquals(r & 1, x);
       } else {
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitPack.java b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
index 3eba3e6..d298ecc 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitPack.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
@@ -29,6 +29,7 @@ import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.orc.impl.writer.StreamOptions;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -101,7 +102,7 @@ public class TestBitPack {
     SerializationUtils utils = new SerializationUtils();
     int fixedWidth = utils.findClosestNumBits(rangeInput);
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
-    OutStream output = new OutStream("test", SIZE, null, collect);
+    OutStream output = new OutStream("test", new StreamOptions(SIZE), collect);
     utils.writeInts(deltaEncoded, 0, deltaEncoded.length, fixedWidth, output);
     output.flush();
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index fe7b53d..78098b2 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -92,7 +92,7 @@ public class TestInStream {
   @Test
   public void testUncompressed() throws Exception {
     OutputCollector collect = new OutputCollector();
-    OutStream out = new OutStream("test", 100, null, collect);
+    OutStream out = new OutStream("test", new StreamOptions(100), collect);
     PositionCollector[] positions = new PositionCollector[1024];
     for(int i=0; i < 1024; ++i) {
       positions[i] = new PositionCollector();
@@ -189,8 +189,9 @@ public class TestInStream {
     Key decryptKey = new SecretKeySpec(rawKey, algorithm.getAlgorithm());
     StreamName name = new StreamName(0, OrcProto.Stream.Kind.DATA);
     byte[] iv = CryptoUtils.createIvForStream(algorithm, name, 0);
+    CompressionCodec codec = new ZlibCodec();
     StreamOptions writerOptions = new StreamOptions(500)
-        .withCodec(new ZlibCodec())
+        .withCodec(codec, codec.createOptions())
         .withEncryption(algorithm, decryptKey, iv);
     OutStream out = new OutStream("test", writerOptions, collect);
     PositionCollector[] positions = new PositionCollector[ROW_COUNT];
@@ -241,7 +242,9 @@ public class TestInStream {
   public void testCompressed() throws Exception {
     OutputCollector collect = new OutputCollector();
     CompressionCodec codec = new ZlibCodec();
-    OutStream out = new OutStream("test", 300, codec, collect);
+    StreamOptions options = new StreamOptions(300)
+        .withCodec(codec, codec.createOptions());
+    OutStream out = new OutStream("test", options, collect);
     PositionCollector[] positions = new PositionCollector[1024];
     for(int i=0; i < 1024; ++i) {
       positions[i] = new PositionCollector();
@@ -275,7 +278,9 @@ public class TestInStream {
   public void testCorruptStream() throws Exception {
     OutputCollector collect = new OutputCollector();
     CompressionCodec codec = new ZlibCodec();
-    OutStream out = new OutStream("test", 500, codec, collect);
+    StreamOptions options = new StreamOptions(500)
+                                .withCodec(codec, codec.createOptions());
+    OutStream out = new OutStream("test", options, collect);
     PositionCollector[] positions = new PositionCollector[1024];
     for(int i=0; i < 1024; ++i) {
       positions[i] = new PositionCollector();
@@ -319,7 +324,9 @@ public class TestInStream {
   public void testDisjointBuffers() throws Exception {
     OutputCollector collect = new OutputCollector();
     CompressionCodec codec = new ZlibCodec();
-    OutStream out = new OutStream("test", 400, codec, collect);
+    StreamOptions options = new StreamOptions(400)
+                                .withCodec(codec, codec.createOptions());
+    OutStream out = new OutStream("test", options, collect);
     PositionCollector[] positions = new PositionCollector[1024];
     DataOutput stream = new DataOutputStream(out);
     for(int i=0; i < 1024; ++i) {
@@ -345,9 +352,9 @@ public class TestInStream {
       buffers.add(new BufferChunk(buffer, offset));
       offset += buffer.remaining();
     }
-    InStream.StreamOptions options = InStream.options()
+    InStream.StreamOptions inOptions = InStream.options()
         .withCodec(codec).withBufferSize(400);
-    InStream in = InStream.create("test", buffers.get(), 1674, options);
+    InStream in = InStream.create("test", buffers.get(), 1674, inOptions);
     assertEquals("compressed stream test position: 0 length: 1674 range: 0" +
                  " offset: 0 limit: 483 range 0 = 0 to 483;" +
                  "  range 1 = 483 to 1625;  range 2 = 1625 to 1674",
@@ -366,7 +373,7 @@ public class TestInStream {
     buffers.clear();
     buffers.add(new BufferChunk(inBuf[1], 483));
     buffers.add(new BufferChunk(inBuf[2], 1625));
-    in = InStream.create("test", buffers.get(), 1674, options);
+    in = InStream.create("test", buffers.get(), 1674, inOptions);
     inStream = new DataInputStream(in);
     positions[303].reset();
     in.seek(positions[303]);
@@ -377,7 +384,7 @@ public class TestInStream {
     buffers.clear();
     buffers.add(new BufferChunk(inBuf[0], 0));
     buffers.add(new BufferChunk(inBuf[2], 1625));
-    in = InStream.create("test", buffers.get(), 1674, options);
+    in = InStream.create("test", buffers.get(), 1674, inOptions);
     inStream = new DataInputStream(in);
     positions[1001].reset();
     for(int i=0; i < 300; ++i) {
@@ -392,7 +399,7 @@ public class TestInStream {
   @Test
   public void testUncompressedDisjointBuffers() throws Exception {
     OutputCollector collect = new OutputCollector();
-    OutStream out = new OutStream("test", 400, null, collect);
+    OutStream out = new OutStream("test", new StreamOptions(400), collect);
     PositionCollector[] positions = new PositionCollector[1024];
     DataOutput stream = new DataOutputStream(out);
     for(int i=0; i < 1024; ++i) {
diff --git a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
index 0888cef..c11caa6 100644
--- a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
@@ -23,14 +23,19 @@ import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.impl.writer.StreamOptions;
 import org.junit.Test;
 
 public class TestIntegerCompressionReader {
 
   public void runSeekTest(CompressionCodec codec) throws Exception {
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+    StreamOptions options = new StreamOptions(1000);
+    if (codec != null) {
+      options.withCodec(codec, codec.createOptions());
+    }
     RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
-        new OutStream("test", 1000, codec, collect), true);
+        new OutStream("test", options, collect), true);
     TestInStream.PositionCollector[] positions =
         new TestInStream.PositionCollector[4096];
     Random random = new Random(99);
@@ -95,7 +100,7 @@ public class TestIntegerCompressionReader {
   public void testSkips() throws Exception {
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
     RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
-        new OutStream("test", 100, null, collect), true);
+        new OutStream("test", new StreamOptions(100), collect), true);
     for(int i=0; i < 2048; ++i) {
       if (i < 1024) {
         out.write(i);
diff --git a/java/core/src/test/org/apache/orc/impl/TestOutStream.java b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
index 473c7a6..87c1fce 100644
--- a/java/core/src/test/org/apache/orc/impl/TestOutStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
@@ -56,7 +56,9 @@ public class TestOutStream {
     PhysicalWriter.OutputReceiver receiver =
         Mockito.mock(PhysicalWriter.OutputReceiver.class);
     CompressionCodec codec = new ZlibCodec();
-    OutStream stream = new OutStream("test", 128*1024, codec, receiver);
+    StreamOptions options = new StreamOptions(128 * 1024)
+        .withCodec(codec, codec.createOptions());
+    OutStream stream = new OutStream("test", options, receiver);
     assertEquals(0L, stream.getBufferSize());
     stream.write(new byte[]{0, 1, 2});
     stream.flush();
@@ -183,8 +185,9 @@ public class TestOutStream {
     Key material = new SecretKeySpec(keyBytes, aes256.getAlgorithm());
     StreamName name = new StreamName(0x1, OrcProto.Stream.Kind.DATA);
     byte[] iv = CryptoUtils.createIvForStream(aes256, name, 0);
+    CompressionCodec codec = new ZlibCodec();
     StreamOptions options = new StreamOptions(1024)
-        .withCodec(new ZlibCodec())
+        .withCodec(codec, codec.createOptions())
         .withEncryption(aes256, material, iv);
     OutStream stream = new OutStream("test", options, receiver);
     for(int i=0; i < 10000; ++i) {
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
index c0b22fa..b3da5d8 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,20 +17,21 @@
  */
 package org.apache.orc.impl;
 
-import static junit.framework.Assert.assertEquals;
-
 import java.nio.ByteBuffer;
 
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.impl.writer.StreamOptions;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class TestRunLengthByteReader {
 
   @Test
   public void testUncompressedSeek() throws Exception {
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
-    RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test", 100,
-        null, collect));
+    RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test",
+        new StreamOptions(100), collect));
     TestInStream.PositionCollector[] positions =
         new TestInStream.PositionCollector[2048];
     for(int i=0; i < 2048; ++i) {
@@ -70,9 +71,11 @@ public class TestRunLengthByteReader {
   @Test
   public void testCompressedSeek() throws Exception {
     CompressionCodec codec = new SnappyCodec();
+    StreamOptions options = new StreamOptions(500)
+                                .withCodec(codec, codec.createOptions());
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
-    RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test", 500,
-        codec, collect));
+    RunLengthByteWriter out = new RunLengthByteWriter(
+        new OutStream("test", options, collect));
     TestInStream.PositionCollector[] positions =
         new TestInStream.PositionCollector[2048];
     for(int i=0; i < 2048; ++i) {
@@ -113,8 +116,8 @@ public class TestRunLengthByteReader {
   @Test
   public void testSkips() throws Exception {
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
-    RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test", 100,
-        null, collect));
+    RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test",
+        new StreamOptions(100), collect));
     for(int i=0; i < 2048; ++i) {
       if (i < 1024) {
         out.write((byte) (i/16));
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
index f067f83..1f5cd41 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
@@ -23,14 +23,19 @@ import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.orc.CompressionCodec;
+import org.apache.orc.impl.writer.StreamOptions;
 import org.junit.Test;
 
 public class TestRunLengthIntegerReader {
 
   public void runSeekTest(CompressionCodec codec) throws Exception {
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+    StreamOptions options = new StreamOptions(1000);
+    if (codec != null) {
+      options.withCodec(codec, codec.createOptions());
+    }
     RunLengthIntegerWriter out = new RunLengthIntegerWriter(
-        new OutStream("test", 1000, codec, collect), true);
+        new OutStream("test", options, collect), true);
     TestInStream.PositionCollector[] positions =
         new TestInStream.PositionCollector[4096];
     Random random = new Random(99);
@@ -94,7 +99,7 @@ public class TestRunLengthIntegerReader {
   public void testSkips() throws Exception {
     TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
     RunLengthIntegerWriter out = new RunLengthIntegerWriter(
-        new OutStream("test", 100, null, collect), true);
+        new OutStream("test", new StreamOptions(100), collect), true);
     for(int i=0; i < 2048; ++i) {
       if (i < 1024) {
         out.write(i);
diff --git a/java/core/src/test/org/apache/orc/impl/TestZlib.java b/java/core/src/test/org/apache/orc/impl/TestZlib.java
index 327ecfc..a58965d 100644
--- a/java/core/src/test/org/apache/orc/impl/TestZlib.java
+++ b/java/core/src/test/org/apache/orc/impl/TestZlib.java
@@ -36,7 +36,8 @@ public class TestZlib {
     in.put(new byte[]{1,2,3,4,5,6,7,10});
     in.flip();
     CompressionCodec codec = new ZlibCodec();
-    assertEquals(false, codec.compress(in, out, null));
+    assertEquals(false, codec.compress(in, out, null,
+        codec.createOptions()));
   }
 
   @Test
diff --git a/java/tools/src/test/org/apache/orc/impl/TestRLEv2.java b/java/tools/src/test/org/apache/orc/impl/TestRLEv2.java
index 4e55c54..441cb02 100644
--- a/java/tools/src/test/org/apache/orc/impl/TestRLEv2.java
+++ b/java/tools/src/test/org/apache/orc/impl/TestRLEv2.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -39,6 +39,7 @@ import org.apache.orc.OrcFile;
 import org.apache.orc.PhysicalWriter;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
+import org.apache.orc.impl.writer.StreamOptions;
 import org.apache.orc.tools.FileDump;
 import org.junit.Before;
 import org.junit.Rule;
@@ -346,8 +347,8 @@ public class TestRLEv2 {
                                     boolean signed) throws IOException {
     TestOutputCatcher catcher = new TestOutputCatcher();
     RunLengthIntegerWriterV2 writer =
-        new RunLengthIntegerWriterV2(new OutStream("test", 10000, null,
-            catcher), signed);
+        new RunLengthIntegerWriterV2(new OutStream("test",
+            new StreamOptions(10000), catcher), signed);
     for(long x: input) {
       writer.write(x);
     }