You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2018/10/25 11:42:19 UTC
[orc] branch master updated: ORC-421: Separate compression options
from the CompressionCodec.
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new b40f6d6 ORC-421: Separate compression options from the CompressionCodec.
b40f6d6 is described below
commit b40f6d682e0d521f3c46fe65b5c9a2934a4a05ab
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Fri Oct 19 20:45:10 2018 -0700
ORC-421: Separate compression options from the CompressionCodec.
Fixes #327
Signed-off-by: Owen O'Malley <om...@apache.org>
---
.../src/java/org/apache/orc/CompressionCodec.java | 38 ++++----
.../src/java/org/apache/orc/PhysicalWriter.java | 6 +-
.../org/apache/orc/impl/AircompressorCodec.java | 24 ++++-
.../src/java/org/apache/orc/impl/OutStream.java | 11 +--
.../java/org/apache/orc/impl/PhysicalFsWriter.java | 33 +++++--
.../src/java/org/apache/orc/impl/WriterImpl.java | 78 ++++++++--------
.../src/java/org/apache/orc/impl/ZlibCodec.java | 104 +++++++++++----------
.../org/apache/orc/impl/writer/StreamOptions.java | 9 +-
.../test/org/apache/orc/TestStringDictionary.java | 3 +-
.../org/apache/orc/impl/TestBitFieldReader.java | 17 ++--
.../src/test/org/apache/orc/impl/TestBitPack.java | 3 +-
.../src/test/org/apache/orc/impl/TestInStream.java | 27 ++++--
.../orc/impl/TestIntegerCompressionReader.java | 9 +-
.../test/org/apache/orc/impl/TestOutStream.java | 7 +-
.../apache/orc/impl/TestRunLengthByteReader.java | 21 +++--
.../orc/impl/TestRunLengthIntegerReader.java | 9 +-
.../src/test/org/apache/orc/impl/TestZlib.java | 3 +-
.../src/test/org/apache/orc/impl/TestRLEv2.java | 7 +-
18 files changed, 238 insertions(+), 171 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/CompressionCodec.java b/java/core/src/java/org/apache/orc/CompressionCodec.java
index 1d2af57..b45a711 100644
--- a/java/core/src/java/org/apache/orc/CompressionCodec.java
+++ b/java/core/src/java/org/apache/orc/CompressionCodec.java
@@ -23,26 +23,40 @@ import java.util.EnumSet;
public interface CompressionCodec {
- enum Modifier {
+ enum SpeedModifier {
/* speed/compression tradeoffs */
FASTEST,
FAST,
- DEFAULT,
- /* data sensitivity modifiers */
+ DEFAULT
+ }
+
+ enum DataKind {
TEXT,
BINARY
- };
+ }
+
+ interface Options {
+ Options setSpeed(SpeedModifier newValue);
+ Options setData(DataKind newValue);
+ }
+
+ /**
+ * Create an instance of the default options for this codec.
+ * @return a new options object
+ */
+ Options createOptions();
/**
* Compress the in buffer to the out buffer.
* @param in the bytes to compress
* @param out the uncompressed bytes
* @param overflow put any additional bytes here
+ * @param options the options to control compression
* @return true if the output is smaller than input
* @throws IOException
*/
- boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow
- ) throws IOException;
+ boolean compress(ByteBuffer in, ByteBuffer out, ByteBuffer overflow,
+ Options options) throws IOException;
/**
* Decompress the in buffer to the out buffer.
@@ -52,18 +66,6 @@ public interface CompressionCodec {
*/
void decompress(ByteBuffer in, ByteBuffer out) throws IOException;
- /**
- * Produce a modified compression codec if the underlying algorithm allows
- * modification.
- *
- * This does not modify the current object, but returns a new object if
- * modifications are possible. Returns the same object if no modifications
- * are possible.
- * @param modifiers compression modifiers (nullable)
- * @return codec for use after optional modification
- */
- CompressionCodec modify(EnumSet<Modifier> modifiers);
-
/** Resets the codec, preparing it for reuse. */
void reset();
diff --git a/java/core/src/java/org/apache/orc/PhysicalWriter.java b/java/core/src/java/org/apache/orc/PhysicalWriter.java
index 051688b..e25e81c 100644
--- a/java/core/src/java/org/apache/orc/PhysicalWriter.java
+++ b/java/core/src/java/org/apache/orc/PhysicalWriter.java
@@ -68,8 +68,7 @@ public interface PhysicalWriter {
* @param codec the compression codec to use
*/
void writeIndex(StreamName name,
- OrcProto.RowIndex.Builder index,
- CompressionCodec codec) throws IOException;
+ OrcProto.RowIndex.Builder index) throws IOException;
/**
* Write a bloom filter index in the given stream name.
@@ -78,8 +77,7 @@ public interface PhysicalWriter {
* @param codec the compression codec to use
*/
void writeBloomFilter(StreamName name,
- OrcProto.BloomFilterIndex.Builder bloom,
- CompressionCodec codec) throws IOException;
+ OrcProto.BloomFilterIndex.Builder bloom) throws IOException;
/**
* Flushes the data in all the streams, spills them to disk, write out stripe
diff --git a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
index 2609d26..1f52d85 100644
--- a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
@@ -25,7 +25,6 @@ import org.apache.orc.CompressionKind;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.EnumSet;
public class AircompressorCodec implements CompressionCodec {
private final CompressionKind kind;
@@ -59,7 +58,8 @@ public class AircompressorCodec implements CompressionCodec {
@Override
public boolean compress(ByteBuffer in, ByteBuffer out,
- ByteBuffer overflow) throws IOException {
+ ByteBuffer overflow,
+ Options options) {
int inBytes = in.remaining();
// I should work on a patch for Snappy to support an overflow buffer
// to prevent the extra buffer copy.
@@ -98,10 +98,24 @@ public class AircompressorCodec implements CompressionCodec {
out.flip();
}
+ /**
+ * Return an options object that doesn't do anything
+ * @return a new options object
+ */
@Override
- public CompressionCodec modify(EnumSet<Modifier> modifiers) {
- // snappy allows no modifications
- return this;
+ public Options createOptions() {
+ return new Options() {
+
+ @Override
+ public Options setSpeed(SpeedModifier newValue) {
+ return this;
+ }
+
+ @Override
+ public Options setData(DataKind newValue) {
+ return this;
+ }
+ };
}
@Override
diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java
index 435f43c..f79999b 100644
--- a/java/core/src/java/org/apache/orc/impl/OutStream.java
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -66,24 +66,19 @@ public class OutStream extends PositionedOutputStream {
private ByteBuffer overflow = null;
private final int bufferSize;
private final CompressionCodec codec;
+ private final CompressionCodec.Options options;
private long compressedBytes = 0;
private long uncompressedBytes = 0;
private final Cipher cipher;
private final Key key;
public OutStream(String name,
- int bufferSize,
- CompressionCodec codec,
- PhysicalWriter.OutputReceiver receiver) {
- this(name, new StreamOptions(bufferSize).withCodec(codec), receiver);
- }
-
- public OutStream(String name,
StreamOptions options,
PhysicalWriter.OutputReceiver receiver) {
this.name = name;
this.bufferSize = options.getBufferSize();
this.codec = options.getCodec();
+ this.options = options.getCodecOptions();
this.receiver = receiver;
if (options.isEncrypted()) {
this.cipher = options.getAlgorithm().createCipher();
@@ -258,7 +253,7 @@ public class OutStream extends PositionedOutputStream {
}
int sizePosn = compressed.position();
compressed.position(compressed.position() + HEADER_SIZE);
- if (codec.compress(current, compressed, overflow)) {
+ if (codec.compress(current, compressed, overflow, options)) {
uncompressedBytes = 0;
// move position back to after the header
current.position(HEADER_SIZE);
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index 7a2be5b..c344878 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -36,6 +36,7 @@ import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.writer.StreamOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
private final int bufferSize;
private final int maxPadding;
private final CompressionKind compress;
+ private final OrcFile.CompressionStrategy compressionStrategy;
private CompressionCodec codec;
private final boolean addBlockPadding;
private final boolean writeVariableLengthBlocks;
@@ -87,6 +89,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
opts.getBufferSize());
}
this.compress = opts.getCompress();
+ this.compressionStrategy = opts.getCompressionStrategy();
this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
this.blockSize = opts.getBlockSize();
LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
@@ -96,7 +99,11 @@ public class PhysicalFsWriter implements PhysicalWriter {
fs.getDefaultReplication(path), blockSize);
blockOffset = 0;
codec = OrcCodecPool.getCodec(compress);
- writer = new OutStream("metadata", bufferSize, codec,
+ StreamOptions options = new StreamOptions(bufferSize);
+ if (codec != null) {
+ options.withCodec(codec, codec.createOptions());
+ }
+ writer = new OutStream("metadata", options,
new DirectStream(rawWriter));
protobufWriter = CodedOutputStream.newInstance(writer);
writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
@@ -392,22 +399,30 @@ public class PhysicalFsWriter implements PhysicalWriter {
return result;
}
+ StreamOptions getOptions(OrcProto.Stream.Kind kind) {
+ StreamOptions options = new StreamOptions(bufferSize);
+ if (codec != null) {
+ options.withCodec(codec, WriterImpl.getCustomizedCodec(codec,
+ compressionStrategy, kind));
+ }
+ return options;
+ }
+
@Override
public void writeIndex(StreamName name,
- OrcProto.RowIndex.Builder index,
- CompressionCodec codec) throws IOException {
- OutputStream stream = new OutStream(path.toString(), bufferSize, codec,
- createDataStream(name));
+ OrcProto.RowIndex.Builder index) throws IOException {
+ OutputStream stream = new OutStream(path.toString(),
+ getOptions(name.getKind()), createDataStream(name));
index.build().writeTo(stream);
stream.flush();
}
@Override
public void writeBloomFilter(StreamName name,
- OrcProto.BloomFilterIndex.Builder bloom,
- CompressionCodec codec) throws IOException {
- OutputStream stream = new OutStream(path.toString(), bufferSize, codec,
- createDataStream(name));
+ OrcProto.BloomFilterIndex.Builder bloom
+ ) throws IOException {
+ OutputStream stream = new OutStream(path.toString(),
+ getOptions(name.getKind()), createDataStream(name));
bloom.build().writeTo(stream);
stream.flush();
}
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index ac071bc..dc621ed 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -43,6 +43,7 @@ import org.apache.orc.PhysicalWriter;
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
+import org.apache.orc.impl.writer.StreamOptions;
import org.apache.orc.impl.writer.TreeWriter;
import org.apache.orc.impl.writer.WriterContext;
import org.slf4j.Logger;
@@ -270,37 +271,35 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
}
- CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
- // TODO: modify may create a new codec here. We want to end() it when the stream is closed,
- // but at this point there's no close() for the stream.
- CompressionCodec result = physicalWriter.getCompressionCodec();
- if (result != null) {
- switch (kind) {
- case BLOOM_FILTER:
- case DATA:
- case DICTIONARY_DATA:
- case BLOOM_FILTER_UTF8:
- if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
- result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
- CompressionCodec.Modifier.TEXT));
- } else {
- result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
- CompressionCodec.Modifier.TEXT));
- }
- break;
- case LENGTH:
- case DICTIONARY_COUNT:
- case PRESENT:
- case ROW_INDEX:
- case SECONDARY:
- // easily compressed using the fastest modes
- result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
- CompressionCodec.Modifier.BINARY));
- break;
- default:
- LOG.info("Missing ORC compression modifiers for " + kind);
- break;
- }
+ public static
+ CompressionCodec.Options getCustomizedCodec(CompressionCodec codec,
+ OrcFile.CompressionStrategy strategy,
+ OrcProto.Stream.Kind kind) {
+ CompressionCodec.Options result = codec.createOptions();
+ switch (kind) {
+ case BLOOM_FILTER:
+ case DATA:
+ case DICTIONARY_DATA:
+ case BLOOM_FILTER_UTF8:
+ result.setData(CompressionCodec.DataKind.TEXT);
+ if (strategy == OrcFile.CompressionStrategy.SPEED) {
+ result.setSpeed(CompressionCodec.SpeedModifier.FAST);
+ } else {
+ result.setSpeed(CompressionCodec.SpeedModifier.DEFAULT);
+ }
+ break;
+ case LENGTH:
+ case DICTIONARY_COUNT:
+ case PRESENT:
+ case ROW_INDEX:
+ case SECONDARY:
+ // easily compressed using the fastest modes
+ result.setSpeed(CompressionCodec.SpeedModifier.FASTEST)
+ .setData(CompressionCodec.DataKind.BINARY);
+ break;
+ default:
+ LOG.info("Missing ORC compression modifiers for " + kind);
+ break;
}
return result;
}
@@ -320,10 +319,14 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
OrcProto.Stream.Kind kind
) throws IOException {
final StreamName name = new StreamName(column, kind);
- CompressionCodec codec = getCustomizedCodec(kind);
-
- return new OutStream(physicalWriter.toString(), bufferSize, codec,
- physicalWriter.createDataStream(name));
+ CompressionCodec codec = physicalWriter.getCompressionCodec();
+ StreamOptions options = new StreamOptions(bufferSize);
+ if (codec != null) {
+ options.withCodec(codec, getCustomizedCodec(codec, compressionStrategy,
+ kind));
+ }
+ return new OutStream(physicalWriter.toString(),
+ options, physicalWriter.createDataStream(name));
}
/**
@@ -404,14 +407,13 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
public void writeIndex(StreamName name,
OrcProto.RowIndex.Builder index) throws IOException {
- physicalWriter.writeIndex(name, index, getCustomizedCodec(name.getKind()));
+ physicalWriter.writeIndex(name, index);
}
public void writeBloomFilter(StreamName name,
OrcProto.BloomFilterIndex.Builder bloom
) throws IOException {
- physicalWriter.writeBloomFilter(name, bloom,
- getCustomizedCodec(name.getKind()));
+ physicalWriter.writeBloomFilter(name, bloom);
}
public boolean getUseUTCTimestamp() {
diff --git a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
index 19a3728..24553b9 100644
--- a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
@@ -33,27 +33,69 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
private HadoopShims.DirectDecompressor decompressShim = null;
private Boolean direct = null;
- private int level;
- private int strategy;
+ static class ZlibOptions implements Options {
+ private int level;
+ private int strategy;
- public ZlibCodec() {
- level = Deflater.DEFAULT_COMPRESSION;
- strategy = Deflater.DEFAULT_STRATEGY;
+ ZlibOptions(int level, int strategy) {
+ this.level = level;
+ this.strategy = strategy;
+ }
+
+ @Override
+ public ZlibOptions setSpeed(SpeedModifier newValue) {
+ switch (newValue) {
+ case FAST:
+ // deflate_fast looking for 16 byte patterns
+ level = Deflater.BEST_SPEED + 1;
+ break;
+ case DEFAULT:
+ // deflate_slow looking for 128 byte patterns
+ level = Deflater.DEFAULT_COMPRESSION;
+ break;
+ case FASTEST:
+ // deflate_fast looking for 8 byte patterns
+ level = Deflater.BEST_SPEED;
+ break;
+ default:
+ break;
+ }
+ return this;
+ }
+
+ @Override
+ public ZlibOptions setData(DataKind newValue) {
+ switch (newValue) {
+ case BINARY:
+ /* filtered == less LZ77, more huffman */
+ strategy = Deflater.FILTERED;
+ break;
+ case TEXT:
+ strategy = Deflater.DEFAULT_STRATEGY;
+ break;
+ default:
+ break;
+ }
+ return this;
+ }
}
- private ZlibCodec(int level, int strategy) {
- this.level = level;
- this.strategy = strategy;
+ @Override
+ public Options createOptions() {
+ return new ZlibOptions(Deflater.DEFAULT_COMPRESSION,
+ Deflater.DEFAULT_STRATEGY);
}
@Override
public boolean compress(ByteBuffer in, ByteBuffer out,
- ByteBuffer overflow) throws IOException {
+ ByteBuffer overflow,
+ Options options) {
+ ZlibOptions zlo = (ZlibOptions) options;
int length = in.remaining();
int outSize = 0;
- Deflater deflater = new Deflater(level, true);
+ Deflater deflater = new Deflater(zlo.level, true);
try {
- deflater.setStrategy(strategy);
+ deflater.setStrategy(zlo.strategy);
deflater.setInput(in.array(), in.arrayOffset() + in.position(), length);
deflater.finish();
int offset = out.arrayOffset() + out.position();
@@ -136,47 +178,7 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
}
@Override
- public CompressionCodec modify(/* @Nullable */ EnumSet<Modifier> modifiers) {
-
- if (modifiers == null) {
- return this;
- }
-
- int l = this.level;
- int s = this.strategy;
-
- for (Modifier m : modifiers) {
- switch (m) {
- case BINARY:
- /* filtered == less LZ77, more huffman */
- s = Deflater.FILTERED;
- break;
- case TEXT:
- s = Deflater.DEFAULT_STRATEGY;
- break;
- case FASTEST:
- // deflate_fast looking for 8 byte patterns
- l = Deflater.BEST_SPEED;
- break;
- case FAST:
- // deflate_fast looking for 16 byte patterns
- l = Deflater.BEST_SPEED + 1;
- break;
- case DEFAULT:
- // deflate_slow looking for 128 byte patterns
- l = Deflater.DEFAULT_COMPRESSION;
- break;
- default:
- break;
- }
- }
- return new ZlibCodec(l, s);
- }
-
- @Override
public void reset() {
- level = Deflater.DEFAULT_COMPRESSION;
- strategy = Deflater.DEFAULT_STRATEGY;
if (decompressShim != null) {
decompressShim.reset();
}
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java b/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
index 3d0c48a..4d800ac 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
@@ -27,6 +27,7 @@ import java.security.Key;
*/
public class StreamOptions {
private CompressionCodec codec;
+ private CompressionCodec.Options options;
private final int bufferSize;
private EncryptionAlgorithm algorithm;
private Key key;
@@ -45,8 +46,10 @@ public class StreamOptions {
* @param codec the codec to compress with
* @return this
*/
- public StreamOptions withCodec(CompressionCodec codec) {
+ public StreamOptions withCodec(CompressionCodec codec,
+ CompressionCodec.Options options) {
this.codec = codec;
+ this.options = options;
return this;
}
@@ -63,6 +66,10 @@ public class StreamOptions {
return codec;
}
+ public CompressionCodec.Options getCodecOptions() {
+ return options;
+ }
+
public int getBufferSize() {
return bufferSize;
}
diff --git a/java/core/src/test/org/apache/orc/TestStringDictionary.java b/java/core/src/test/org/apache/orc/TestStringDictionary.java
index cc4f8d8..27965fe 100644
--- a/java/core/src/test/org/apache/orc/TestStringDictionary.java
+++ b/java/core/src/test/org/apache/orc/TestStringDictionary.java
@@ -36,6 +36,7 @@ import org.apache.orc.impl.OutStream;
import org.apache.orc.impl.RecordReaderImpl;
import org.apache.orc.impl.StreamName;
import org.apache.orc.impl.TestInStream;
+import org.apache.orc.impl.writer.StreamOptions;
import org.apache.orc.impl.writer.StringTreeWriter;
import org.apache.orc.impl.writer.TreeWriter;
import org.apache.orc.impl.writer.WriterContext;
@@ -174,7 +175,7 @@ public class TestStringDictionary {
public OutStream createStream(int column, OrcProto.Stream.Kind kind) throws IOException {
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
streams.put(new StreamName(column, kind), collect);
- return new OutStream("test", 1000, null, collect);
+ return new OutStream("test", new StreamOptions(1000), collect);
}
@Override
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
index 03c379e..25483ca 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
@@ -17,11 +17,12 @@
*/
package org.apache.orc.impl;
-import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertEquals;
import java.nio.ByteBuffer;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.impl.writer.StreamOptions;
import org.junit.Test;
public class TestBitFieldReader {
@@ -29,8 +30,12 @@ public class TestBitFieldReader {
public void runSeekTest(CompressionCodec codec) throws Exception {
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
final int COUNT = 16384;
+ StreamOptions options = new StreamOptions(500);
+ if (codec != null) {
+ options.withCodec(codec, codec.createOptions());
+ }
BitFieldWriter out = new BitFieldWriter(
- new OutStream("test", 500, codec, collect), 1);
+ new OutStream("test", options, collect), 1);
TestInStream.PositionCollector[] positions =
new TestInStream.PositionCollector[COUNT];
for(int i=0; i < COUNT; ++i) {
@@ -83,7 +88,7 @@ public class TestBitFieldReader {
public void testSkips() throws Exception {
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
BitFieldWriter out = new BitFieldWriter(
- new OutStream("test", 100, null, collect), 1);
+ new OutStream("test", new StreamOptions(100), collect), 1);
final int COUNT = 16384;
for(int i=0; i < COUNT; ++i) {
if (i < COUNT/2) {
@@ -99,7 +104,7 @@ public class TestBitFieldReader {
BitFieldReader in = new BitFieldReader(InStream.create("test",
new BufferChunk(inBuf, 0), inBuf.remaining()));
for(int i=0; i < COUNT; i += 5) {
- int x = (int) in.next();
+ int x = in.next();
if (i < COUNT/2) {
assertEquals(i & 1, x);
} else {
@@ -116,7 +121,7 @@ public class TestBitFieldReader {
public void testSeekSkip() throws Exception {
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
BitFieldWriter out = new BitFieldWriter(
- new OutStream("test", 100, null, collect), 1);
+ new OutStream("test", new StreamOptions(100), collect), 1);
final int COUNT = 256;
TestInStream.PositionCollector posn = new TestInStream.PositionCollector();
for(int i=0; i < COUNT; ++i) {
@@ -138,7 +143,7 @@ public class TestBitFieldReader {
in.seek(posn);
in.skip(10);
for(int r = 210; r < COUNT; ++r) {
- int x = (int) in.next();
+ int x = in.next();
if (r < COUNT/2) {
assertEquals(r & 1, x);
} else {
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitPack.java b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
index 3eba3e6..d298ecc 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitPack.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
@@ -29,6 +29,7 @@ import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.orc.impl.writer.StreamOptions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -101,7 +102,7 @@ public class TestBitPack {
SerializationUtils utils = new SerializationUtils();
int fixedWidth = utils.findClosestNumBits(rangeInput);
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
- OutStream output = new OutStream("test", SIZE, null, collect);
+ OutStream output = new OutStream("test", new StreamOptions(SIZE), collect);
utils.writeInts(deltaEncoded, 0, deltaEncoded.length, fixedWidth, output);
output.flush();
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index fe7b53d..78098b2 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -92,7 +92,7 @@ public class TestInStream {
@Test
public void testUncompressed() throws Exception {
OutputCollector collect = new OutputCollector();
- OutStream out = new OutStream("test", 100, null, collect);
+ OutStream out = new OutStream("test", new StreamOptions(100), collect);
PositionCollector[] positions = new PositionCollector[1024];
for(int i=0; i < 1024; ++i) {
positions[i] = new PositionCollector();
@@ -189,8 +189,9 @@ public class TestInStream {
Key decryptKey = new SecretKeySpec(rawKey, algorithm.getAlgorithm());
StreamName name = new StreamName(0, OrcProto.Stream.Kind.DATA);
byte[] iv = CryptoUtils.createIvForStream(algorithm, name, 0);
+ CompressionCodec codec = new ZlibCodec();
StreamOptions writerOptions = new StreamOptions(500)
- .withCodec(new ZlibCodec())
+ .withCodec(codec, codec.createOptions())
.withEncryption(algorithm, decryptKey, iv);
OutStream out = new OutStream("test", writerOptions, collect);
PositionCollector[] positions = new PositionCollector[ROW_COUNT];
@@ -241,7 +242,9 @@ public class TestInStream {
public void testCompressed() throws Exception {
OutputCollector collect = new OutputCollector();
CompressionCodec codec = new ZlibCodec();
- OutStream out = new OutStream("test", 300, codec, collect);
+ StreamOptions options = new StreamOptions(300)
+ .withCodec(codec, codec.createOptions());
+ OutStream out = new OutStream("test", options, collect);
PositionCollector[] positions = new PositionCollector[1024];
for(int i=0; i < 1024; ++i) {
positions[i] = new PositionCollector();
@@ -275,7 +278,9 @@ public class TestInStream {
public void testCorruptStream() throws Exception {
OutputCollector collect = new OutputCollector();
CompressionCodec codec = new ZlibCodec();
- OutStream out = new OutStream("test", 500, codec, collect);
+ StreamOptions options = new StreamOptions(500)
+ .withCodec(codec, codec.createOptions());
+ OutStream out = new OutStream("test", options, collect);
PositionCollector[] positions = new PositionCollector[1024];
for(int i=0; i < 1024; ++i) {
positions[i] = new PositionCollector();
@@ -319,7 +324,9 @@ public class TestInStream {
public void testDisjointBuffers() throws Exception {
OutputCollector collect = new OutputCollector();
CompressionCodec codec = new ZlibCodec();
- OutStream out = new OutStream("test", 400, codec, collect);
+ StreamOptions options = new StreamOptions(400)
+ .withCodec(codec, codec.createOptions());
+ OutStream out = new OutStream("test", options, collect);
PositionCollector[] positions = new PositionCollector[1024];
DataOutput stream = new DataOutputStream(out);
for(int i=0; i < 1024; ++i) {
@@ -345,9 +352,9 @@ public class TestInStream {
buffers.add(new BufferChunk(buffer, offset));
offset += buffer.remaining();
}
- InStream.StreamOptions options = InStream.options()
+ InStream.StreamOptions inOptions = InStream.options()
.withCodec(codec).withBufferSize(400);
- InStream in = InStream.create("test", buffers.get(), 1674, options);
+ InStream in = InStream.create("test", buffers.get(), 1674, inOptions);
assertEquals("compressed stream test position: 0 length: 1674 range: 0" +
" offset: 0 limit: 483 range 0 = 0 to 483;" +
" range 1 = 483 to 1625; range 2 = 1625 to 1674",
@@ -366,7 +373,7 @@ public class TestInStream {
buffers.clear();
buffers.add(new BufferChunk(inBuf[1], 483));
buffers.add(new BufferChunk(inBuf[2], 1625));
- in = InStream.create("test", buffers.get(), 1674, options);
+ in = InStream.create("test", buffers.get(), 1674, inOptions);
inStream = new DataInputStream(in);
positions[303].reset();
in.seek(positions[303]);
@@ -377,7 +384,7 @@ public class TestInStream {
buffers.clear();
buffers.add(new BufferChunk(inBuf[0], 0));
buffers.add(new BufferChunk(inBuf[2], 1625));
- in = InStream.create("test", buffers.get(), 1674, options);
+ in = InStream.create("test", buffers.get(), 1674, inOptions);
inStream = new DataInputStream(in);
positions[1001].reset();
for(int i=0; i < 300; ++i) {
@@ -392,7 +399,7 @@ public class TestInStream {
@Test
public void testUncompressedDisjointBuffers() throws Exception {
OutputCollector collect = new OutputCollector();
- OutStream out = new OutStream("test", 400, null, collect);
+ OutStream out = new OutStream("test", new StreamOptions(400), collect);
PositionCollector[] positions = new PositionCollector[1024];
DataOutput stream = new DataOutputStream(out);
for(int i=0; i < 1024; ++i) {
diff --git a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
index 0888cef..c11caa6 100644
--- a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
@@ -23,14 +23,19 @@ import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.impl.writer.StreamOptions;
import org.junit.Test;
public class TestIntegerCompressionReader {
public void runSeekTest(CompressionCodec codec) throws Exception {
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ StreamOptions options = new StreamOptions(1000);
+ if (codec != null) {
+ options.withCodec(codec, codec.createOptions());
+ }
RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
- new OutStream("test", 1000, codec, collect), true);
+ new OutStream("test", options, collect), true);
TestInStream.PositionCollector[] positions =
new TestInStream.PositionCollector[4096];
Random random = new Random(99);
@@ -95,7 +100,7 @@ public class TestIntegerCompressionReader {
public void testSkips() throws Exception {
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
- new OutStream("test", 100, null, collect), true);
+ new OutStream("test", new StreamOptions(100), collect), true);
for(int i=0; i < 2048; ++i) {
if (i < 1024) {
out.write(i);
diff --git a/java/core/src/test/org/apache/orc/impl/TestOutStream.java b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
index 473c7a6..87c1fce 100644
--- a/java/core/src/test/org/apache/orc/impl/TestOutStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
@@ -56,7 +56,9 @@ public class TestOutStream {
PhysicalWriter.OutputReceiver receiver =
Mockito.mock(PhysicalWriter.OutputReceiver.class);
CompressionCodec codec = new ZlibCodec();
- OutStream stream = new OutStream("test", 128*1024, codec, receiver);
+ StreamOptions options = new StreamOptions(128 * 1024)
+ .withCodec(codec, codec.createOptions());
+ OutStream stream = new OutStream("test", options, receiver);
assertEquals(0L, stream.getBufferSize());
stream.write(new byte[]{0, 1, 2});
stream.flush();
@@ -183,8 +185,9 @@ public class TestOutStream {
Key material = new SecretKeySpec(keyBytes, aes256.getAlgorithm());
StreamName name = new StreamName(0x1, OrcProto.Stream.Kind.DATA);
byte[] iv = CryptoUtils.createIvForStream(aes256, name, 0);
+ CompressionCodec codec = new ZlibCodec();
StreamOptions options = new StreamOptions(1024)
- .withCodec(new ZlibCodec())
+ .withCodec(codec, codec.createOptions())
.withEncryption(aes256, material, iv);
OutStream stream = new OutStream("test", options, receiver);
for(int i=0; i < 10000; ++i) {
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
index c0b22fa..b3da5d8 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,20 +17,21 @@
*/
package org.apache.orc.impl;
-import static junit.framework.Assert.assertEquals;
-
import java.nio.ByteBuffer;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.impl.writer.StreamOptions;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
public class TestRunLengthByteReader {
@Test
public void testUncompressedSeek() throws Exception {
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
- RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test", 100,
- null, collect));
+ RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test",
+ new StreamOptions(100), collect));
TestInStream.PositionCollector[] positions =
new TestInStream.PositionCollector[2048];
for(int i=0; i < 2048; ++i) {
@@ -70,9 +71,11 @@ public class TestRunLengthByteReader {
@Test
public void testCompressedSeek() throws Exception {
CompressionCodec codec = new SnappyCodec();
+ StreamOptions options = new StreamOptions(500)
+ .withCodec(codec, codec.createOptions());
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
- RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test", 500,
- codec, collect));
+ RunLengthByteWriter out = new RunLengthByteWriter(
+ new OutStream("test", options, collect));
TestInStream.PositionCollector[] positions =
new TestInStream.PositionCollector[2048];
for(int i=0; i < 2048; ++i) {
@@ -113,8 +116,8 @@ public class TestRunLengthByteReader {
@Test
public void testSkips() throws Exception {
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
- RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test", 100,
- null, collect));
+ RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test",
+ new StreamOptions(100), collect));
for(int i=0; i < 2048; ++i) {
if (i < 1024) {
out.write((byte) (i/16));
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
index f067f83..1f5cd41 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
@@ -23,14 +23,19 @@ import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.impl.writer.StreamOptions;
import org.junit.Test;
public class TestRunLengthIntegerReader {
public void runSeekTest(CompressionCodec codec) throws Exception {
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ StreamOptions options = new StreamOptions(1000);
+ if (codec != null) {
+ options.withCodec(codec, codec.createOptions());
+ }
RunLengthIntegerWriter out = new RunLengthIntegerWriter(
- new OutStream("test", 1000, codec, collect), true);
+ new OutStream("test", options, collect), true);
TestInStream.PositionCollector[] positions =
new TestInStream.PositionCollector[4096];
Random random = new Random(99);
@@ -94,7 +99,7 @@ public class TestRunLengthIntegerReader {
public void testSkips() throws Exception {
TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
RunLengthIntegerWriter out = new RunLengthIntegerWriter(
- new OutStream("test", 100, null, collect), true);
+ new OutStream("test", new StreamOptions(100), collect), true);
for(int i=0; i < 2048; ++i) {
if (i < 1024) {
out.write(i);
diff --git a/java/core/src/test/org/apache/orc/impl/TestZlib.java b/java/core/src/test/org/apache/orc/impl/TestZlib.java
index 327ecfc..a58965d 100644
--- a/java/core/src/test/org/apache/orc/impl/TestZlib.java
+++ b/java/core/src/test/org/apache/orc/impl/TestZlib.java
@@ -36,7 +36,8 @@ public class TestZlib {
in.put(new byte[]{1,2,3,4,5,6,7,10});
in.flip();
CompressionCodec codec = new ZlibCodec();
- assertEquals(false, codec.compress(in, out, null));
+ assertEquals(false, codec.compress(in, out, null,
+ codec.createOptions()));
}
@Test
diff --git a/java/tools/src/test/org/apache/orc/impl/TestRLEv2.java b/java/tools/src/test/org/apache/orc/impl/TestRLEv2.java
index 4e55c54..441cb02 100644
--- a/java/tools/src/test/org/apache/orc/impl/TestRLEv2.java
+++ b/java/tools/src/test/org/apache/orc/impl/TestRLEv2.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -39,6 +39,7 @@ import org.apache.orc.OrcFile;
import org.apache.orc.PhysicalWriter;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
+import org.apache.orc.impl.writer.StreamOptions;
import org.apache.orc.tools.FileDump;
import org.junit.Before;
import org.junit.Rule;
@@ -346,8 +347,8 @@ public class TestRLEv2 {
boolean signed) throws IOException {
TestOutputCatcher catcher = new TestOutputCatcher();
RunLengthIntegerWriterV2 writer =
- new RunLengthIntegerWriterV2(new OutStream("test", 10000, null,
- catcher), signed);
+ new RunLengthIntegerWriterV2(new OutStream("test",
+ new StreamOptions(10000), catcher), signed);
for(long x: input) {
writer.write(x);
}