You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2019/07/03 16:32:35 UTC
[orc] branch master updated: ORC-516: Update InStream for column
compression.
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new 7afd89a ORC-516: Update InStream for column compression.
7afd89a is described below
commit 7afd89a855a1e19811371caaaf168c4cac1e2e3f
Author: Owen O'Malley <om...@apache.org>
AuthorDate: Mon Jun 24 13:05:55 2019 -0700
ORC-516: Update InStream for column compression.
Fixes #407
Signed-off-by: Owen O'Malley <om...@apache.org>
---
.../src/java/org/apache/orc/impl/BufferChunk.java | 29 +-
.../java/org/apache/orc/impl/BufferChunkList.java | 14 +
.../src/java/org/apache/orc/impl/InStream.java | 297 +++++++++++++--------
.../src/java/org/apache/orc/impl/ReaderImpl.java | 4 +-
.../java/org/apache/orc/impl/RecordReaderImpl.java | 4 +-
.../org/apache/orc/impl/RecordReaderUtils.java | 6 +-
.../org/apache/orc/impl/TestBitFieldReader.java | 6 +-
.../src/test/org/apache/orc/impl/TestBitPack.java | 2 +-
.../src/test/org/apache/orc/impl/TestInStream.java | 26 +-
.../orc/impl/TestIntegerCompressionReader.java | 4 +-
.../test/org/apache/orc/impl/TestOutStream.java | 15 +-
.../org/apache/orc/impl/TestPhysicalFsWriter.java | 19 +-
.../apache/orc/impl/TestRunLengthByteReader.java | 6 +-
.../orc/impl/TestRunLengthIntegerReader.java | 8 +-
.../org/apache/orc/impl/TestSchemaEvolution.java | 15 +-
15 files changed, 282 insertions(+), 173 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/impl/BufferChunk.java b/java/core/src/java/org/apache/orc/impl/BufferChunk.java
index afde82f..951a6d8 100644
--- a/java/core/src/java/org/apache/orc/impl/BufferChunk.java
+++ b/java/core/src/java/org/apache/orc/impl/BufferChunk.java
@@ -1,6 +1,4 @@
-package org.apache.orc.impl;
-
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +16,8 @@ package org.apache.orc.impl;
* limitations under the License.
*/
+package org.apache.orc.impl;
+
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.slf4j.Logger;
@@ -34,15 +34,20 @@ public class BufferChunk extends DiskRangeList {
private static final Logger LOG =
LoggerFactory.getLogger(BufferChunk.class);
- final ByteBuffer chunk;
+ private ByteBuffer chunk;
+
+ public BufferChunk(long offset, int length) {
+ super(offset, offset + length);
+ chunk = null;
+ }
public BufferChunk(ByteBuffer chunk, long offset) {
super(offset, offset + chunk.remaining());
this.chunk = chunk;
}
- public ByteBuffer getChunk() {
- return chunk;
+ public void setChunk(ByteBuffer chunk) {
+ this.chunk = chunk;
}
@Override
@@ -52,10 +57,14 @@ public class BufferChunk extends DiskRangeList {
@Override
public final String toString() {
- boolean makesSense = chunk.remaining() == (end - offset);
- return "data range [" + offset + ", " + end + "), size: " + chunk.remaining()
- + (makesSense ? "" : "(!)") + " type: " +
- (chunk.isDirect() ? "direct" : "array-backed");
+ if (chunk == null) {
+ return "data range[" + offset + ", " + end +")";
+ } else {
+ boolean makesSense = chunk.remaining() == (end - offset);
+ return "data range [" + offset + ", " + end + "), size: " + chunk.remaining()
+ + (makesSense ? "" : "(!)") + " type: " +
+ (chunk.isDirect() ? "direct" : "array-backed");
+ }
}
@Override
diff --git a/java/core/src/java/org/apache/orc/impl/BufferChunkList.java b/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
index d8a89db..1a0aea9 100644
--- a/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
+++ b/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
@@ -32,6 +32,7 @@ public class BufferChunkList {
} else {
tail.next = value;
value.prev = tail;
+ value.next = null;
tail = value;
}
}
@@ -40,6 +41,19 @@ public class BufferChunkList {
return head;
}
+ /**
+ * Get the nth element of the list
+ * @param chunk the element number to get from 0
+ * @return the given element number
+ */
+ public BufferChunk get(int chunk) {
+ BufferChunk ptr = head;
+ for(int i=0; i < chunk; ++i) {
+ ptr = ptr == null ? null : (BufferChunk) ptr.next;
+ }
+ return ptr;
+ }
+
public void clear() {
head = null;
tail = null;
diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
index 06f439e..dd4429e 100644
--- a/java/core/src/java/org/apache/orc/impl/InStream.java
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.Key;
+import java.util.function.Consumer;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.orc.CompressionCodec;
@@ -41,25 +42,25 @@ public abstract class InStream extends InputStream {
private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
public static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
- protected final String name;
- protected long length;
+ protected final Object name;
+ protected final long offset;
+ protected final long length;
- public InStream(String name, long length) {
+ public InStream(Object name, long offset, long length) {
this.name = name;
+ this.offset = offset;
this.length = length;
}
- public String getStreamName() {
- return name;
- }
-
- public long getStreamLength() {
- return length;
+ public String toString() {
+ return name.toString();
}
@Override
public abstract void close();
+ public abstract void changeIv(Consumer<byte[]> modifier);
+
static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
int result = 0;
DiskRangeList range = list;
@@ -75,8 +76,8 @@ public abstract class InStream extends InputStream {
*/
public static class UncompressedStream extends InStream {
private DiskRangeList bytes;
- private long length;
- protected long currentOffset;
+ // position in the stream (0..length)
+ protected long position;
protected ByteBuffer decrypted;
protected DiskRangeList currentRange;
@@ -86,33 +87,33 @@ public abstract class InStream extends InputStream {
* @param name name of the stream
* @param length the number of bytes for the stream
*/
- public UncompressedStream(String name, long length) {
- super(name, length);
+ public UncompressedStream(Object name, long offset, long length) {
+ super(name, offset, length);
}
- public UncompressedStream(String name,
+ public UncompressedStream(Object name,
DiskRangeList input,
+ long offset,
long length) {
- super(name, length);
- reset(input, length);
+ super(name, offset, length);
+ reset(input);
}
- protected void reset(DiskRangeList input, long length) {
+ protected void reset(DiskRangeList input) {
this.bytes = input;
- this.length = length;
- currentOffset = input == null ? 0 : input.getOffset();
+ position = input == null ? 0 : input.getOffset() - offset;
setCurrent(input, true);
}
@Override
public int read() {
if (decrypted == null || decrypted.remaining() == 0) {
- if (currentOffset == length) {
+ if (position == length) {
return -1;
}
setCurrent(currentRange.next, false);
}
- currentOffset += 1;
+ position += 1;
return 0xff & decrypted.get();
}
@@ -122,21 +123,21 @@ public abstract class InStream extends InputStream {
decrypted = newRange.getData().slice();
// Move the position in the ByteBuffer to match the currentOffset,
// which is relative to the stream.
- decrypted.position((int) (currentOffset - newRange.getOffset()));
+ decrypted.position((int) (position + offset - newRange.getOffset()));
}
}
@Override
public int read(byte[] data, int offset, int length) {
if (decrypted == null || decrypted.remaining() == 0) {
- if (currentOffset == this.length) {
+ if (position == this.length) {
return -1;
}
setCurrent(currentRange.next, false);
}
int actualLength = Math.min(length, decrypted.remaining());
decrypted.get(data, offset, actualLength);
- currentOffset += actualLength;
+ position += actualLength;
return actualLength;
}
@@ -145,19 +146,24 @@ public abstract class InStream extends InputStream {
if (decrypted != null && decrypted.remaining() > 0) {
return decrypted.remaining();
}
- return (int) (length - currentOffset);
+ return (int) (length - position);
}
@Override
public void close() {
currentRange = null;
- currentOffset = length;
+ position = length;
// explicit de-ref of bytes[]
decrypted = null;
bytes = null;
}
@Override
+ public void changeIv(Consumer<byte[]> modifier) {
+ // nothing to do
+ }
+
+ @Override
public void seek(PositionProvider index) throws IOException {
seek(index.getNext());
}
@@ -166,18 +172,20 @@ public abstract class InStream extends InputStream {
if (desired == 0 && bytes == null) {
return;
}
+ // compute the position of the desired point in file
+ long posn = desired + offset;
// If we are seeking inside of the current range, just reposition.
- if (currentRange != null && desired >= currentRange.getOffset() &&
- desired < currentRange.getEnd()) {
- decrypted.position((int) (desired - currentRange.getOffset()));
- currentOffset = desired;
+ if (currentRange != null && posn >= currentRange.getOffset() &&
+ posn < currentRange.getEnd()) {
+ decrypted.position((int) (posn - currentRange.getOffset()));
+ position = desired;
} else {
for (DiskRangeList curRange = bytes; curRange != null;
curRange = curRange.next) {
- if (curRange.getOffset() <= desired &&
- (curRange.next == null ? desired <= curRange.getEnd() :
- desired < curRange.getEnd())) {
- currentOffset = desired;
+ if (curRange.getOffset() <= posn &&
+ (curRange.next == null ? posn <= curRange.getEnd() :
+ posn < curRange.getEnd())) {
+ position = desired;
setCurrent(curRange, true);
return;
}
@@ -189,7 +197,7 @@ public abstract class InStream extends InputStream {
@Override
public String toString() {
- return "uncompressed stream " + name + " position: " + currentOffset +
+ return "uncompressed stream " + name + " position: " + position +
" length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
" offset: " + (decrypted == null ? 0 : decrypted.position()) +
" limit: " + (decrypted == null ? 0 : decrypted.limit());
@@ -209,21 +217,39 @@ public abstract class InStream extends InputStream {
* Manage the state of the decryption, including the ability to seek.
*/
static class EncryptionState {
- private final String name;
+ private final Object name;
private final EncryptionAlgorithm algorithm;
private final Key key;
private final byte[] iv;
private final Cipher cipher;
+ private final long offset;
private ByteBuffer decrypted;
- EncryptionState(String name, StreamOptions options) {
+ EncryptionState(Object name, long offset, StreamOptions options) {
this.name = name;
- algorithm = options.algorithm;
- key = options.key;
- iv = options.iv;
+ this.offset = offset;
+ algorithm = options.getAlgorithm();
+ key = options.getKey();
+ iv = options.getIv();
cipher = algorithm.createCipher();
}
+ void changeIv(Consumer<byte[]> modifier) {
+ modifier.accept(iv);
+ updateIv();
+ OutStream.logKeyAndIv(name, key, iv);
+ }
+
+ private void updateIv() {
+ try {
+ cipher.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv));
+ } catch (InvalidKeyException e) {
+ throw new IllegalArgumentException("Invalid key on " + name, e);
+ } catch (InvalidAlgorithmParameterException e) {
+ throw new IllegalArgumentException("Invalid iv on " + name, e);
+ }
+ }
+
/**
* We are seeking to a new range, so update the cipher to change the IV
* to match. This code assumes that we only support encryption in CTR mode.
@@ -233,35 +259,27 @@ public abstract class InStream extends InputStream {
int blockSize = cipher.getBlockSize();
long encryptionBlocks = offset / blockSize;
long extra = offset % blockSize;
- byte[] advancedIv;
- if (encryptionBlocks == 0) {
- advancedIv = iv;
- } else {
+ CryptoUtils.clearCounter(iv);
+ if (encryptionBlocks != 0) {
// Add the encryption blocks into the initial iv, to compensate for
// skipping over decrypting those bytes.
- advancedIv = new byte[iv.length];
- System.arraycopy(iv, 0, advancedIv, 0, iv.length);
int posn = iv.length - 1;
while (encryptionBlocks > 0) {
- long sum = (advancedIv[posn] & 0xff) + encryptionBlocks;
- advancedIv[posn--] = (byte) sum;
+ long sum = (iv[posn] & 0xff) + encryptionBlocks;
+ iv[posn--] = (byte) sum;
encryptionBlocks = sum / 0x100;
}
}
- try {
- cipher.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(advancedIv));
- // If the range starts at an offset that doesn't match the encryption
- // block, we need to advance some bytes within an encryption block.
- if (extra > 0) {
+ updateIv();
+ // If the range starts at an offset that doesn't match the encryption
+ // block, we need to advance some bytes within an encryption block.
+ if (extra > 0) {
+ try {
byte[] wasted = new byte[(int) extra];
cipher.update(wasted, 0, wasted.length, wasted, 0);
+ } catch (ShortBufferException e) {
+ throw new IllegalArgumentException("Short buffer in " + name, e);
}
- } catch (InvalidKeyException e) {
- throw new IllegalArgumentException("Invalid key on " + name, e);
- } catch (InvalidAlgorithmParameterException e) {
- throw new IllegalArgumentException("Invalid iv on " + name, e);
- } catch (ShortBufferException e) {
- throw new IllegalArgumentException("Short buffer in " + name, e);
}
}
@@ -305,11 +323,11 @@ public abstract class InStream extends InputStream {
public static class EncryptedStream extends UncompressedStream {
private final EncryptionState encrypt;
- public EncryptedStream(String name, DiskRangeList input, long length,
+ public EncryptedStream(Object name, DiskRangeList input, long offset, long length,
StreamOptions options) {
- super(name, length);
- encrypt = new EncryptionState(name, options);
- reset(input, length);
+ super(name, offset, length);
+ encrypt = new EncryptionState(name, offset, options);
+ reset(input);
}
@Override
@@ -317,10 +335,10 @@ public abstract class InStream extends InputStream {
currentRange = newRange;
if (newRange != null) {
if (isJump) {
- encrypt.changeIv(newRange.getOffset());
+ encrypt.changeIv(newRange.getOffset() - offset);
}
decrypted = encrypt.decrypt(newRange);
- decrypted.position((int) (currentOffset - newRange.getOffset()));
+ decrypted.position((int) (position + offset - newRange.getOffset()));
}
}
@@ -331,6 +349,11 @@ public abstract class InStream extends InputStream {
}
@Override
+ public void changeIv(Consumer<byte[]> modifier) {
+ encrypt.changeIv(modifier);
+ }
+
+ @Override
public String toString() {
return "encrypted " + super.toString();
}
@@ -342,7 +365,7 @@ public abstract class InStream extends InputStream {
private ByteBuffer uncompressed;
private final CompressionCodec codec;
protected ByteBuffer compressed;
- protected long currentOffset;
+ protected long position;
protected DiskRangeList currentRange;
private boolean isUncompressedOriginal;
@@ -354,10 +377,11 @@ public abstract class InStream extends InputStream {
* @param length the total number of bytes in the stream
* @param options the options used to read the stream
*/
- public CompressedStream(String name,
+ public CompressedStream(Object name,
+ long offset,
long length,
StreamOptions options) {
- super(name, length);
+ super(name, offset, length);
this.codec = options.codec;
this.bufferSize = options.bufferSize;
}
@@ -369,25 +393,24 @@ public abstract class InStream extends InputStream {
* @param length the total length of the stream
* @param options the options to read the data with
*/
- public CompressedStream(String name,
+ public CompressedStream(Object name,
DiskRangeList input,
+ long offset,
long length,
StreamOptions options) {
- super(name, length);
+ super(name, offset, length);
this.codec = options.codec;
this.bufferSize = options.bufferSize;
- reset(input, length);
+ reset(input);
}
/**
* Reset the input to a new set of data.
* @param input the input data
- * @param length the number of bytes in the stream
*/
- void reset(DiskRangeList input, long length) {
+ void reset(DiskRangeList input) {
bytes = input;
- this.length = length;
- currentOffset = input == null ? 0 : input.getOffset();
+ position = input == null ? 0 : input.getOffset() - offset;
setCurrent(input, true);
}
@@ -400,7 +423,7 @@ public abstract class InStream extends InputStream {
currentRange = newRange;
if (newRange != null) {
compressed = newRange.getData().slice();
- compressed.position((int) (currentOffset - newRange.getOffset()));
+ compressed.position((int) (position + offset - newRange.getOffset()));
}
}
@@ -417,11 +440,11 @@ public abstract class InStream extends InputStream {
if (chunkLength > bufferSize) {
throw new IllegalArgumentException("Buffer size too small. size = " +
- bufferSize + " needed = " + chunkLength);
+ bufferSize + " needed = " + chunkLength + " in " + name);
}
// read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
- currentOffset += OutStream.HEADER_SIZE;
+ position += OutStream.HEADER_SIZE;
ByteBuffer slice = this.slice(chunkLength);
@@ -464,7 +487,7 @@ public abstract class InStream extends InputStream {
private boolean ensureUncompressed() throws IOException {
while (uncompressed == null || uncompressed.remaining() == 0) {
- if (currentOffset == this.length) {
+ if (position == this.length) {
return false;
}
readHeader();
@@ -485,11 +508,16 @@ public abstract class InStream extends InputStream {
uncompressed = null;
compressed = null;
currentRange = null;
- currentOffset = length;
+ position = length;
bytes = null;
}
@Override
+ public void changeIv(Consumer<byte[]> modifier) {
+ // nothing to do
+ }
+
+ @Override
public void seek(PositionProvider index) throws IOException {
seek(index.getNext());
long uncompressedBytes = index.getNext();
@@ -507,13 +535,13 @@ public abstract class InStream extends InputStream {
private ByteBuffer slice(int chunkLength) throws IOException {
int len = chunkLength;
final DiskRangeList oldRange = currentRange;
- final long oldOffset = currentOffset;
+ final long oldPosition = position;
ByteBuffer slice;
if (compressed.remaining() >= len) {
slice = compressed.slice();
// simple case
slice.limit(len);
- currentOffset += len;
+ position += len;
compressed.position(compressed.position() + len);
return slice;
} else if (currentRange.next == null) {
@@ -531,7 +559,7 @@ public abstract class InStream extends InputStream {
// we need to consolidate 2 or more buffers into 1
// first copy out compressed buffers
ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
- currentOffset += compressed.remaining();
+ position += compressed.remaining();
len -= compressed.remaining();
copy.put(compressed);
@@ -544,18 +572,18 @@ public abstract class InStream extends InputStream {
slice = compressed.slice();
slice.limit(len);
copy.put(slice);
- currentOffset += len;
+ position += len;
compressed.position(compressed.position() + len);
copy.flip();
return copy;
}
- currentOffset += compressed.remaining();
+ position += compressed.remaining();
len -= compressed.remaining();
copy.put(compressed);
}
// restore offsets for exception clarity
- currentOffset = oldOffset;
+ position = oldPosition;
setCurrent(oldRange, true);
throw new IOException("EOF in " + this + " while trying to read " +
chunkLength + " bytes");
@@ -565,11 +593,12 @@ public abstract class InStream extends InputStream {
if (desired == 0 && bytes == null) {
return;
}
+ long posn = desired + offset;
for (DiskRangeList range = bytes; range != null; range = range.next) {
- if (range.getOffset() <= desired &&
- (range.next == null ? desired <= range.getEnd() :
- desired < range.getEnd())) {
- currentOffset = desired;
+ if (range.getOffset() <= posn &&
+ (range.next == null ? posn <= range.getEnd() :
+ posn < range.getEnd())) {
+ position = desired;
setCurrent(range, true);
return;
}
@@ -597,7 +626,7 @@ public abstract class InStream extends InputStream {
@Override
public String toString() {
- return "compressed stream " + name + " position: " + currentOffset +
+ return "compressed stream " + name + " position: " + position +
" length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
" offset: " + (compressed == null ? 0 : compressed.position()) +
" limit: " + (compressed == null ? 0 : compressed.limit()) +
@@ -611,13 +640,14 @@ public abstract class InStream extends InputStream {
private static class EncryptedCompressedStream extends CompressedStream {
private final EncryptionState encrypt;
- public EncryptedCompressedStream(String name,
+ public EncryptedCompressedStream(Object name,
DiskRangeList input,
+ long offset,
long length,
StreamOptions options) {
- super(name, length, options);
- encrypt = new EncryptionState(name, options);
- reset(input, length);
+ super(name, offset, length, options);
+ encrypt = new EncryptionState(name, offset, options);
+ reset(input);
}
@Override
@@ -625,10 +655,10 @@ public abstract class InStream extends InputStream {
currentRange = newRange;
if (newRange != null) {
if (isJump) {
- encrypt.changeIv(newRange.getOffset());
+ encrypt.changeIv(newRange.getOffset() - offset);
}
compressed = encrypt.decrypt(newRange);
- compressed.position((int) (currentOffset - newRange.getOffset()));
+ compressed.position((int) (position + offset - newRange.getOffset()));
}
}
@@ -639,6 +669,11 @@ public abstract class InStream extends InputStream {
}
@Override
+ public void changeIv(Consumer<byte[]> modifier) {
+ encrypt.changeIv(modifier);
+ }
+
+ @Override
public String toString() {
return "encrypted " + super.toString();
}
@@ -653,6 +688,17 @@ public abstract class InStream extends InputStream {
private Key key;
private byte[] iv;
+ public StreamOptions(StreamOptions other) {
+ codec = other.codec;
+ bufferSize = other.bufferSize;
+ algorithm = other.algorithm;
+ key = other.key;
+ iv = other.iv == null ? null : other.iv.clone();
+ }
+
+ public StreamOptions() {
+ }
+
public StreamOptions withCodec(CompressionCodec value) {
this.codec = value;
return this;
@@ -672,10 +718,30 @@ public abstract class InStream extends InputStream {
return this;
}
+ public boolean isCompressed() {
+ return codec != null;
+ }
+
public CompressionCodec getCodec() {
return codec;
}
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public EncryptionAlgorithm getAlgorithm() {
+ return algorithm;
+ }
+
+ public Key getKey() {
+ return key;
+ }
+
+ public byte[] getIv() {
+ return iv;
+ }
+
@Override
public StreamOptions clone() {
try {
@@ -689,6 +755,20 @@ public abstract class InStream extends InputStream {
throw new UnsupportedOperationException("uncloneable", e);
}
}
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("compress: ");
+ buffer.append(codec == null ? "none" : codec.getKind());
+ buffer.append(", buffer size: ");
+ buffer.append(bufferSize);
+ if (key != null) {
+ buffer.append(", encryption: ");
+ buffer.append(algorithm);
+ }
+ return buffer.toString();
+ }
}
public static StreamOptions options() {
@@ -699,24 +779,30 @@ public abstract class InStream extends InputStream {
* Create an input stream from a list of disk ranges with data.
* @param name the name of the stream
* @param input the list of ranges of bytes for the stream; from disk or cache
+ * @param offset the first byte offset of the stream
* @param length the length in bytes of the stream
* @param options the options to read with
* @return an input stream
*/
- public static InStream create(String name,
+ public static InStream create(Object name,
DiskRangeList input,
+ long offset,
long length,
StreamOptions options) {
+ LOG.debug("Reading {} with {} from {} for {}", name, options, offset,
+ length);
if (options == null || options.codec == null) {
if (options == null || options.key == null) {
- return new UncompressedStream(name, input, length);
+ return new UncompressedStream(name, input, offset, length);
} else {
- return new EncryptedStream(name, input, length, options);
+ OutStream.logKeyAndIv(name, options.getKey(), options.getIv());
+ return new EncryptedStream(name, input, offset, length, options);
}
} else if (options.key == null) {
- return new CompressedStream(name, input, length, options);
+ return new CompressedStream(name, input, offset, length, options);
} else {
- return new EncryptedCompressedStream(name, input, length, options);
+ OutStream.logKeyAndIv(name, options.getKey(), options.getIv());
+ return new EncryptedCompressedStream(name, input, offset, length, options);
}
}
@@ -727,10 +813,11 @@ public abstract class InStream extends InputStream {
* @param length the length in bytes of the stream
* @return an input stream
*/
- public static InStream create(String name,
+ public static InStream create(Object name,
DiskRangeList input,
- long length) throws IOException {
- return create(name, input, length, null);
+ long offset,
+ long length) {
+ return create(name, input, offset, length, null);
}
/**
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index 8edfaee..ddd53b7 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -412,7 +412,7 @@ public class ReaderImpl implements Reader {
bb.position(footerAbsPos);
bb.limit(footerAbsPos + footerSize);
return OrcProto.Footer.parseFrom(InStream.createCodedInputStream(
- InStream.create("footer", new BufferChunk(bb, 0), footerSize, options)));
+ InStream.create("footer", new BufferChunk(bb, 0), 0, footerSize, options)));
}
public static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
@@ -420,7 +420,7 @@ public class ReaderImpl implements Reader {
bb.position(metadataAbsPos);
bb.limit(metadataAbsPos + metadataSize);
return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream(
- InStream.create("metadata", new BufferChunk(bb, 0), metadataSize, options)));
+ InStream.create("metadata", new BufferChunk(bb, 0), 0, metadataSize, options)));
}
private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index c1b9652..bf72982 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -1067,7 +1067,7 @@ public class RecordReaderImpl implements RecordReader {
if (!(range instanceof BufferChunk)) {
continue;
}
- dataReader.releaseBuffer(((BufferChunk) range).getChunk());
+ dataReader.releaseBuffer(range.getData());
}
}
}
@@ -1219,7 +1219,7 @@ public class RecordReaderImpl implements RecordReader {
ranges, streamOffset, streamDesc.getLength());
StreamName name = new StreamName(column, streamDesc.getKind());
streams.put(name, InStream.create(name.toString(), buffers,
- streamDesc.getLength(), options));
+ 0, streamDesc.getLength(), options));
streamOffset += streamDesc.getLength();
}
}
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index d4c57e5..aeb33d4 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -234,7 +234,7 @@ public class RecordReaderUtils {
indexes[column] = OrcProto.RowIndex.parseFrom(
InStream.createCodedInputStream(InStream.create("index",
new BufferChunk(bb, 0),
- stream.getLength(), options)));
+ 0, stream.getLength(), options)));
}
break;
case BLOOM_FILTER:
@@ -246,7 +246,7 @@ public class RecordReaderUtils {
bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom
(InStream.createCodedInputStream(InStream.create(
"bloom_filter", new BufferChunk(bb, 0),
- stream.getLength(), options)));
+ 0, stream.getLength(), options)));
}
break;
default:
@@ -271,7 +271,7 @@ public class RecordReaderUtils {
file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
return OrcProto.StripeFooter.parseFrom(
InStream.createCodedInputStream(InStream.create("footer",
- new BufferChunk(tailBuf, 0), tailLength, options)));
+ new BufferChunk(tailBuf, 0), 0, tailLength, options)));
}
@Override
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
index 4590f9b..6f54c77 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
@@ -53,7 +53,7 @@ public class TestBitFieldReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
BitFieldReader in = new BitFieldReader(InStream.create("test",
- new BufferChunk(inBuf, 0), inBuf.remaining(),
+ new BufferChunk(inBuf, 0), 0, inBuf.remaining(),
InStream.options().withCodec(codec).withBufferSize(500)));
for(int i=0; i < COUNT; ++i) {
int x = in.next();
@@ -102,7 +102,7 @@ public class TestBitFieldReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
BitFieldReader in = new BitFieldReader(InStream.create("test",
- new BufferChunk(inBuf, 0), inBuf.remaining()));
+ new BufferChunk(inBuf, 0), 0, inBuf.remaining()));
for(int i=0; i < COUNT; i += 5) {
int x = in.next();
if (i < COUNT/2) {
@@ -139,7 +139,7 @@ public class TestBitFieldReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
BitFieldReader in = new BitFieldReader(InStream.create("test",
- new BufferChunk(inBuf, 0), inBuf.remaining()));
+ new BufferChunk(inBuf, 0), 0, inBuf.remaining()));
in.seek(posn);
in.skip(10);
for(int r = 210; r < COUNT; ++r) {
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitPack.java b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
index d298ecc..a26b130 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitPack.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
@@ -110,7 +110,7 @@ public class TestBitPack {
inBuf.flip();
long[] buff = new long[SIZE];
utils.readInts(buff, 0, SIZE, fixedWidth,
- InStream.create("test", new BufferChunk(inBuf,0),
+ InStream.create("test", new BufferChunk(inBuf,0), 0,
inBuf.remaining()));
for (int i = 0; i < SIZE; i++) {
buff[i] = utils.zigzagDecode(buff[i]);
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index 1881be3..3b6e28c 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -108,7 +108,7 @@ public class TestInStream {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
- inBuf.remaining());
+ 0, inBuf.remaining());
assertEquals("uncompressed stream test position: 0 length: 1024" +
" range: 0 offset: 0 limit: 1024",
in.toString());
@@ -161,7 +161,7 @@ public class TestInStream {
offset += size;
}
- InStream in = InStream.create("test", list.get(), collect.buffer.size(),
+ InStream in = InStream.create("test", list.get(), 0, collect.buffer.size(),
InStream.options().withEncryption(algorithm, decryptKey,
writerOptions.getIv()));
assertEquals("encrypted uncompressed stream test position: 0 length: 8192" +
@@ -219,7 +219,7 @@ public class TestInStream {
offset += size;
}
- InStream in = InStream.create("test", list.get(), collect.buffer.size(),
+ InStream in = InStream.create("test", list.get(), 0, collect.buffer.size(),
InStream.options()
.withCodec(new ZlibCodec()).withBufferSize(500)
.withEncryption(algorithm, decryptKey, writerOptions.getIv()));
@@ -258,7 +258,7 @@ public class TestInStream {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
+ InStream in = InStream.create("test", new BufferChunk(inBuf, 0), 0,
inBuf.remaining(),
InStream.options().withCodec(codec).withBufferSize(300));
assertEquals("compressed stream test position: 0 length: 961 range: 0" +
@@ -294,7 +294,7 @@ public class TestInStream {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- InStream in = InStream.create("test", new BufferChunk(inBuf, 0),
+ InStream in = InStream.create("test", new BufferChunk(inBuf, 0), 0,
inBuf.remaining(),
InStream.options().withCodec(codec).withBufferSize(100));
byte[] contents = new byte[1024];
@@ -310,7 +310,7 @@ public class TestInStream {
inBuf.put((byte) 32);
inBuf.put((byte) 0);
inBuf.flip();
- in = InStream.create("test2", new BufferChunk(inBuf, 0),
+ in = InStream.create("test2", new BufferChunk(inBuf, 0), 0,
inBuf.remaining(),
InStream.options().withCodec(codec).withBufferSize(300));
try {
@@ -354,7 +354,7 @@ public class TestInStream {
}
InStream.StreamOptions inOptions = InStream.options()
.withCodec(codec).withBufferSize(400);
- InStream in = InStream.create("test", buffers.get(), 1674, inOptions);
+ InStream in = InStream.create("test", buffers.get(), 0, 1674, inOptions);
assertEquals("compressed stream test position: 0 length: 1674 range: 0" +
" offset: 0 limit: 483 range 0 = 0 to 483;" +
" range 1 = 483 to 1625; range 2 = 1625 to 1674",
@@ -373,7 +373,7 @@ public class TestInStream {
buffers.clear();
buffers.add(new BufferChunk(inBuf[1], 483));
buffers.add(new BufferChunk(inBuf[2], 1625));
- in = InStream.create("test", buffers.get(), 1674, inOptions);
+ in = InStream.create("test", buffers.get(), 0, 1674, inOptions);
inStream = new DataInputStream(in);
positions[303].reset();
in.seek(positions[303]);
@@ -384,7 +384,7 @@ public class TestInStream {
buffers.clear();
buffers.add(new BufferChunk(inBuf[0], 0));
buffers.add(new BufferChunk(inBuf[2], 1625));
- in = InStream.create("test", buffers.get(), 1674, inOptions);
+ in = InStream.create("test", buffers.get(), 0, 1674, inOptions);
inStream = new DataInputStream(in);
positions[1001].reset();
for(int i=0; i < 300; ++i) {
@@ -424,7 +424,7 @@ public class TestInStream {
buffers.add(new BufferChunk(inBuf[0], 0));
buffers.add(new BufferChunk(inBuf[1], 1024));
buffers.add(new BufferChunk(inBuf[2], 3072));
- InStream in = InStream.create("test", buffers.get(), 4096);
+ InStream in = InStream.create("test", buffers.get(), 0, 4096);
assertEquals("uncompressed stream test position: 0 length: 4096" +
" range: 0 offset: 0 limit: 1024",
in.toString());
@@ -442,7 +442,7 @@ public class TestInStream {
buffers.clear();
buffers.add(new BufferChunk(inBuf[1], 1024));
buffers.add(new BufferChunk(inBuf[2], 3072));
- in = InStream.create("test", buffers.get(), 4096);
+ in = InStream.create("test", buffers.get(), 0, 4096);
inStream = new DataInputStream(in);
positions[256].reset();
in.seek(positions[256]);
@@ -453,7 +453,7 @@ public class TestInStream {
buffers.clear();
buffers.add(new BufferChunk(inBuf[0], 0));
buffers.add(new BufferChunk(inBuf[2], 3072));
- in = InStream.create("test", buffers.get(), 4096);
+ in = InStream.create("test", buffers.get(), 0, 4096);
inStream = new DataInputStream(in);
positions[768].reset();
for(int i=0; i < 256; ++i) {
@@ -468,7 +468,7 @@ public class TestInStream {
@Test
public void testEmptyDiskRange() throws IOException {
DiskRangeList range = new BufferChunk(ByteBuffer.allocate(0), 0);
- InStream stream = new InStream.UncompressedStream("test", range, 0);
+ InStream stream = new InStream.UncompressedStream("test", range, 0, 0);
assertEquals(0, stream.available());
stream.seek(new PositionProvider() {
@Override
diff --git a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
index 178e768..3f66ace 100644
--- a/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
@@ -61,7 +61,7 @@ public class TestIntegerCompressionReader {
inBuf.flip();
RunLengthIntegerReaderV2 in =
new RunLengthIntegerReaderV2(InStream.create("test",
- new BufferChunk(inBuf, 0), inBuf.remaining(),
+ new BufferChunk(inBuf, 0), 0, inBuf.remaining(),
InStream.options().withCodec(codec).withBufferSize(1000)), true, false);
for(int i=0; i < 2048; ++i) {
int x = (int) in.next();
@@ -114,7 +114,7 @@ public class TestIntegerCompressionReader {
inBuf.flip();
RunLengthIntegerReaderV2 in =
new RunLengthIntegerReaderV2(InStream.create("test",
- new BufferChunk(inBuf, 0),
+ new BufferChunk(inBuf, 0), 0,
inBuf.remaining()), true, false);
for(int i=0; i < 2048; i += 10) {
int x = (int) in.next();
diff --git a/java/core/src/test/org/apache/orc/impl/TestOutStream.java b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
index b905c7e..95dedf6 100644
--- a/java/core/src/test/org/apache/orc/impl/TestOutStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestOutStream.java
@@ -20,6 +20,7 @@ package org.apache.orc.impl;
import org.apache.orc.CompressionCodec;
import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.InMemoryKeystore;
import org.apache.orc.OrcProto;
import org.apache.orc.PhysicalWriter;
import org.apache.orc.impl.writer.StreamOptions;
@@ -35,22 +36,12 @@ import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.Key;
-import java.security.NoSuchAlgorithmException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class TestOutStream {
- public static final boolean TEST_AES_256;
- static {
- try {
- TEST_AES_256 = Cipher.getMaxAllowedKeyLength("AES") != 128;
- } catch (NoSuchAlgorithmException e) {
- throw new IllegalArgumentException("Unknown algorithm", e);
- }
- }
-
@Test
public void testFlush() throws Exception {
PhysicalWriter.OutputReceiver receiver =
@@ -180,7 +171,7 @@ public class TestOutStream {
@Test
public void testCompression256Encryption() throws Exception {
// disable test if AES_256 is not available
- Assume.assumeTrue(TEST_AES_256);
+ Assume.assumeTrue(InMemoryKeystore.SUPPORTS_AES_256);
TestInStream.OutputCollector receiver = new TestInStream.OutputCollector();
EncryptionAlgorithm aes256 = EncryptionAlgorithm.AES_CTR_256;
byte[] keyBytes = new byte[aes256.keyLength()];
@@ -211,7 +202,7 @@ public class TestOutStream {
// use InStream to decompress it
BufferChunkList ranges = new BufferChunkList();
ranges.add(new BufferChunk(ByteBuffer.wrap(compressed), 0));
- InStream decompressedStream = InStream.create("test", ranges.get(),
+ InStream decompressedStream = InStream.create("test", ranges.get(), 0,
compressed.length,
InStream.options().withCodec(new ZlibCodec()).withBufferSize(1024));
diff --git a/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java b/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java
index 6d2d298..333bc98 100644
--- a/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java
+++ b/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java
@@ -57,7 +57,7 @@ public class TestPhysicalFsWriter {
}
@Override
- public void write(int b) throws IOException {
+ public void write(int b) {
contents.add(new byte[]{(byte) b});
}
@@ -97,12 +97,12 @@ public class TestPhysicalFsWriter {
@Override
public FSDataOutputStream append(Path f, int bufferSize,
- Progressable progress) throws IOException {
+ Progressable progress) {
throw new UnsupportedOperationException("append not supported");
}
@Override
- public boolean rename(Path src, Path dst) throws IOException {
+ public boolean rename(Path src, Path dst) {
boolean result = fileContents.containsKey(src) &&
!fileContents.containsKey(dst);
if (result) {
@@ -113,14 +113,14 @@ public class TestPhysicalFsWriter {
}
@Override
- public boolean delete(Path f, boolean recursive) throws IOException {
+ public boolean delete(Path f, boolean recursive) {
boolean result = fileContents.containsKey(f);
fileContents.remove(f);
return result;
}
@Override
- public FileStatus[] listStatus(Path f) throws IOException {
+ public FileStatus[] listStatus(Path f) {
return new FileStatus[]{getFileStatus(f)};
}
@@ -135,12 +135,12 @@ public class TestPhysicalFsWriter {
}
@Override
- public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ public boolean mkdirs(Path f, FsPermission permission) {
return false;
}
@Override
- public FileStatus getFileStatus(Path f) throws IOException {
+ public FileStatus getFileStatus(Path f) {
List<byte[]> contents = fileContents.get(f);
if (contents != null) {
long sum = 0;
@@ -262,7 +262,8 @@ public class TestPhysicalFsWriter {
}
@Override
- public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException {
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+ ByteBufferPoolShim pool) {
return null;
}
@@ -276,7 +277,7 @@ public class TestPhysicalFsWriter {
}
@Override
- public KeyProvider getKeyProvider(Configuration conf, Random random) throws IOException {
+ public KeyProvider getKeyProvider(Configuration conf, Random random) {
return null;
}
}
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
index 6887f26..cc81dbc 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
@@ -48,7 +48,7 @@ public class TestRunLengthByteReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
- new BufferChunk(inBuf, 0), inBuf.remaining()));
+ new BufferChunk(inBuf, 0), 0, inBuf.remaining()));
for(int i=0; i < 2048; ++i) {
int x = in.next() & 0xff;
if (i < 1024) {
@@ -92,7 +92,7 @@ public class TestRunLengthByteReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
- new BufferChunk(inBuf, 0), inBuf.remaining(),
+ new BufferChunk(inBuf, 0), 0, inBuf.remaining(),
InStream.options().withCodec(codec).withBufferSize(500)));
for(int i=0; i < 2048; ++i) {
int x = in.next() & 0xff;
@@ -130,7 +130,7 @@ public class TestRunLengthByteReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
- new BufferChunk(inBuf, 0), inBuf.remaining()));
+ new BufferChunk(inBuf, 0), 0, inBuf.remaining()));
for(int i=0; i < 2048; i += 10) {
int x = in.next() & 0xff;
if (i < 1024) {
diff --git a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
index 6b29152..ac41e6e 100644
--- a/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
@@ -17,8 +17,6 @@
*/
package org.apache.orc.impl;
-import static junit.framework.Assert.assertEquals;
-
import java.nio.ByteBuffer;
import java.util.Random;
@@ -26,6 +24,8 @@ import org.apache.orc.CompressionCodec;
import org.apache.orc.impl.writer.StreamOptions;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
public class TestRunLengthIntegerReader {
public void runSeekTest(CompressionCodec codec) throws Exception {
@@ -60,7 +60,7 @@ public class TestRunLengthIntegerReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
RunLengthIntegerReader in = new RunLengthIntegerReader(InStream.create
- ("test", new BufferChunk(inBuf, 0), inBuf.remaining(),
+ ("test", new BufferChunk(inBuf, 0), 0, inBuf.remaining(),
InStream.options().withCodec(codec).withBufferSize(1000)), true);
for(int i=0; i < 2048; ++i) {
int x = (int) in.next();
@@ -112,7 +112,7 @@ public class TestRunLengthIntegerReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
RunLengthIntegerReader in = new RunLengthIntegerReader(InStream.create
- ("test", new BufferChunk(inBuf, 0), inBuf.remaining()), true);
+ ("test", new BufferChunk(inBuf, 0), 0, inBuf.remaining()), true);
for(int i=0; i < 2048; i += 10) {
int x = (int) in.next();
if (i < 1024) {
diff --git a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
index 1102d12..87478d4 100644
--- a/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
+++ b/java/core/src/test/org/apache/orc/impl/TestSchemaEvolution.java
@@ -23,10 +23,8 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -47,7 +45,6 @@ import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -1581,7 +1578,17 @@ public class TestSchemaEvolution {
buffer[i] = (byte) values[i];
}
ranges.add(new BufferChunk(ByteBuffer.wrap(buffer), 0));
- streams.put(name, InStream.create(name.toString(), ranges.get(), values.length));
+ streams.put(name, InStream.create(name.toString(), ranges.get(), 0,
+ values.length));
+ }
+
+ static ByteBuffer createBuffer(int... values) {
+ ByteBuffer result = ByteBuffer.allocate(values.length);
+ for(int v: values) {
+ result.put((byte) v);
+ }
+ result.flip();
+ return result;
}
@Test