You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2018/07/13 21:28:50 UTC
[2/2] orc git commit: ORC-251: Extend InStream and OutStream to
support encryption.
ORC-251: Extend InStream and OutStream to support encryption.
Fixes #278
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/edbb9673
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/edbb9673
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/edbb9673
Branch: refs/heads/master
Commit: edbb9673d143247633dddcfdbea24f7869b151fe
Parents: f3dd9c1
Author: Owen O'Malley <om...@apache.org>
Authored: Wed May 9 09:36:28 2018 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Fri Jul 13 14:28:06 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/orc/CompressionCodec.java | 5 +
.../org/apache/orc/impl/AircompressorCodec.java | 11 +-
.../org/apache/orc/impl/BufferChunkList.java | 47 ++
.../java/org/apache/orc/impl/CryptoUtils.java | 78 +++
.../src/java/org/apache/orc/impl/InStream.java | 554 ++++++++++++++-----
.../src/java/org/apache/orc/impl/OrcTail.java | 3 +-
.../src/java/org/apache/orc/impl/OutStream.java | 118 +++-
.../java/org/apache/orc/impl/ReaderImpl.java | 89 +--
.../org/apache/orc/impl/RecordReaderImpl.java | 6 +-
.../org/apache/orc/impl/RecordReaderUtils.java | 106 ++--
.../orc/impl/SettableUncompressedStream.java | 43 --
.../java/org/apache/orc/impl/SnappyCodec.java | 3 +-
.../java/org/apache/orc/impl/WriterImpl.java | 4 +-
.../src/java/org/apache/orc/impl/ZlibCodec.java | 6 +
.../apache/orc/impl/writer/StreamOptions.java | 85 +++
.../org/apache/orc/impl/TestBitFieldReader.java | 14 +-
.../test/org/apache/orc/impl/TestBitPack.java | 5 +-
.../org/apache/orc/impl/TestCryptoUtils.java | 47 ++
.../test/org/apache/orc/impl/TestInStream.java | 217 ++++++--
.../orc/impl/TestIntegerCompressionReader.java | 15 +-
.../test/org/apache/orc/impl/TestOutStream.java | 164 +++++-
.../orc/impl/TestRunLengthByteReader.java | 7 +-
.../orc/impl/TestRunLengthIntegerReader.java | 7 +-
.../apache/orc/impl/TestSchemaEvolution.java | 8 +-
24 files changed, 1230 insertions(+), 412 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/CompressionCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/CompressionCodec.java b/java/core/src/java/org/apache/orc/CompressionCodec.java
index dd517b3..1d2af57 100644
--- a/java/core/src/java/org/apache/orc/CompressionCodec.java
+++ b/java/core/src/java/org/apache/orc/CompressionCodec.java
@@ -69,4 +69,9 @@ public interface CompressionCodec {
/** Closes the codec, releasing the resources. */
void close();
+
+ /**
+ * Get the compression kind.
+ */
+ CompressionKind getKind();
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
index 39d678c..2609d26 100644
--- a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java
@@ -21,16 +21,20 @@ package org.apache.orc.impl;
import io.airlift.compress.Compressor;
import io.airlift.compress.Decompressor;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
public class AircompressorCodec implements CompressionCodec {
+ private final CompressionKind kind;
private final Compressor compressor;
private final Decompressor decompressor;
- AircompressorCodec(Compressor compressor, Decompressor decompressor) {
+ AircompressorCodec(CompressionKind kind, Compressor compressor,
+ Decompressor decompressor) {
+ this.kind = kind;
this.compressor = compressor;
this.decompressor = decompressor;
}
@@ -109,4 +113,9 @@ public class AircompressorCodec implements CompressionCodec {
public void close() {
// Nothing to do.
}
+
+ @Override
+ public CompressionKind getKind() {
+ return kind;
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/BufferChunkList.java b/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
new file mode 100644
index 0000000..d8a89db
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/BufferChunkList.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+/**
+ * Builds a list of buffer chunks
+ */
+public class BufferChunkList {
+ private BufferChunk head;
+ private BufferChunk tail;
+
+ public void add(BufferChunk value) {
+ if (head == null) {
+ head = value;
+ tail = value;
+ } else {
+ tail.next = value;
+ value.prev = tail;
+ tail = value;
+ }
+ }
+
+ public BufferChunk get() {
+ return head;
+ }
+
+ public void clear() {
+ head = null;
+ tail = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/CryptoUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/CryptoUtils.java b/java/core/src/java/org/apache/orc/impl/CryptoUtils.java
new file mode 100644
index 0000000..072b06b
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/CryptoUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.EncryptionAlgorithm;
+import java.security.SecureRandom;
+
+/**
+ * This class has routines to work with encryption within ORC files.
+ */
+public class CryptoUtils {
+
+ private static final int COLUMN_ID_LENGTH = 3;
+ private static final int KIND_LENGTH = 2;
+ private static final int STRIPE_ID_LENGTH = 3;
+ private static final int MIN_COUNT_BYTES = 8;
+
+ static final int MAX_COLUMN = 0xffffff;
+ static final int MAX_KIND = 0xffff;
+ static final int MAX_STRIPE = 0xffffff;
+
+ /**
+ * Create a unique IV for each stream within a single key.
+ * The top bytes are set with the column, stream kind, and stripe id and the
+ * lower 8 bytes are always 0.
+ * @param name the stream name
+ * @param stripeId the stripe id
+ * @return the iv for the stream
+ */
+ public static byte[] createIvForStream(EncryptionAlgorithm algorithm,
+ StreamName name,
+ int stripeId) {
+ byte[] iv = new byte[algorithm.getIvLength()];
+ int columnId = name.getColumn();
+ if (columnId < 0 || columnId > MAX_COLUMN) {
+ throw new IllegalArgumentException("ORC encryption is limited to " +
+ MAX_COLUMN + " columns. Value = " + columnId);
+ }
+ int k = name.getKind().getNumber();
+ if (k < 0 || k > MAX_KIND) {
+ throw new IllegalArgumentException("ORC encryption is limited to " +
+ MAX_KIND + " stream kinds. Value = " + k);
+ }
+ if (stripeId < 0 || stripeId > MAX_STRIPE){
+ throw new IllegalArgumentException("ORC encryption is limited to " +
+ MAX_STRIPE + " stripes. Value = " + stripeId);
+ }
+ // the rest of the iv is used for counting within the stream
+ if (iv.length - (COLUMN_ID_LENGTH + KIND_LENGTH + STRIPE_ID_LENGTH) < MIN_COUNT_BYTES) {
+ throw new IllegalArgumentException("Not enough space in the iv for the count");
+ }
+ iv[0] = (byte)(columnId >> 16);
+ iv[1] = (byte)(columnId >> 8);
+ iv[2] = (byte)columnId;
+ iv[COLUMN_ID_LENGTH] = (byte)(k >> 8);
+ iv[COLUMN_ID_LENGTH+1] = (byte)(k);
+ iv[COLUMN_ID_LENGTH+KIND_LENGTH] = (byte)(stripeId >> 16);
+ iv[COLUMN_ID_LENGTH+KIND_LENGTH+1] = (byte)(stripeId >> 8);
+ iv[COLUMN_ID_LENGTH+KIND_LENGTH+2] = (byte)stripeId;
+ return iv;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
index 94e9232..06f439e 100644
--- a/java/core/src/java/org/apache/orc/impl/InStream.java
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,17 +20,22 @@ package org.apache.orc.impl;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.Key;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.io.DiskRange;
import com.google.protobuf.CodedInputStream;
+import javax.crypto.Cipher;
+import javax.crypto.ShortBufferException;
+import javax.crypto.spec.IvParameterSpec;
+
public abstract class InStream extends InputStream {
private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
@@ -55,66 +60,101 @@ public abstract class InStream extends InputStream {
@Override
public abstract void close();
+ static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
+ int result = 0;
+ DiskRangeList range = list;
+ while (range != null && range != current) {
+ result += 1;
+ range = range.next;
+ }
+ return result;
+ }
+
+ /**
+ * Implements a stream over an uncompressed stream.
+ */
public static class UncompressedStream extends InStream {
- private List<DiskRange> bytes;
+ private DiskRangeList bytes;
private long length;
protected long currentOffset;
- private ByteBuffer range;
- private int currentRange;
+ protected ByteBuffer decrypted;
+ protected DiskRangeList currentRange;
+
+ /**
+ * Create the stream without calling reset on it.
+ * This is used for the subclass that needs to do more setup.
+ * @param name name of the stream
+ * @param length the number of bytes for the stream
+ */
+ public UncompressedStream(String name, long length) {
+ super(name, length);
+ }
- public UncompressedStream(String name, List<DiskRange> input, long length) {
+ public UncompressedStream(String name,
+ DiskRangeList input,
+ long length) {
super(name, length);
reset(input, length);
}
- protected void reset(List<DiskRange> input, long length) {
+ protected void reset(DiskRangeList input, long length) {
this.bytes = input;
this.length = length;
- currentRange = 0;
- currentOffset = 0;
- range = null;
+ currentOffset = input == null ? 0 : input.getOffset();
+ setCurrent(input, true);
}
@Override
public int read() {
- if (range == null || range.remaining() == 0) {
+ if (decrypted == null || decrypted.remaining() == 0) {
if (currentOffset == length) {
return -1;
}
- seek(currentOffset);
+ setCurrent(currentRange.next, false);
}
currentOffset += 1;
- return 0xff & range.get();
+ return 0xff & decrypted.get();
+ }
+
+ protected void setCurrent(DiskRangeList newRange, boolean isJump) {
+ currentRange = newRange;
+ if (newRange != null) {
+ decrypted = newRange.getData().slice();
+ // Move the position in the ByteBuffer to match the currentOffset,
+ // which is relative to the stream.
+ decrypted.position((int) (currentOffset - newRange.getOffset()));
+ }
}
@Override
public int read(byte[] data, int offset, int length) {
- if (range == null || range.remaining() == 0) {
+ if (decrypted == null || decrypted.remaining() == 0) {
if (currentOffset == this.length) {
return -1;
}
- seek(currentOffset);
+ setCurrent(currentRange.next, false);
}
- int actualLength = Math.min(length, range.remaining());
- range.get(data, offset, actualLength);
+ int actualLength = Math.min(length, decrypted.remaining());
+ decrypted.get(data, offset, actualLength);
currentOffset += actualLength;
return actualLength;
}
@Override
public int available() {
- if (range != null && range.remaining() > 0) {
- return range.remaining();
+ if (decrypted != null && decrypted.remaining() > 0) {
+ return decrypted.remaining();
}
return (int) (length - currentOffset);
}
@Override
public void close() {
- currentRange = bytes.size();
+ currentRange = null;
currentOffset = length;
// explicit de-ref of bytes[]
- bytes.clear();
+ decrypted = null;
+ bytes = null;
}
@Override
@@ -122,45 +162,37 @@ public abstract class InStream extends InputStream {
seek(index.getNext());
}
- public void seek(long desired) {
- if (desired == 0 && bytes.isEmpty()) {
+ public void seek(long desired) throws IOException {
+ if (desired == 0 && bytes == null) {
return;
}
- int i = 0;
- for (DiskRange curRange : bytes) {
- if (curRange.getOffset() <= desired &&
- (desired - curRange.getOffset()) < curRange.getLength()) {
- currentOffset = desired;
- currentRange = i;
- this.range = curRange.getData().duplicate();
- int pos = range.position();
- pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
- this.range.position(pos);
- return;
- }
- ++i;
- }
- // if they are seeking to the precise end, go ahead and let them go there
- int segments = bytes.size();
- if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+ // If we are seeking inside of the current range, just reposition.
+ if (currentRange != null && desired >= currentRange.getOffset() &&
+ desired < currentRange.getEnd()) {
+ decrypted.position((int) (desired - currentRange.getOffset()));
currentOffset = desired;
- currentRange = segments - 1;
- DiskRange curRange = bytes.get(currentRange);
- this.range = curRange.getData().duplicate();
- int pos = range.position();
- pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
- this.range.position(pos);
- return;
+ } else {
+ for (DiskRangeList curRange = bytes; curRange != null;
+ curRange = curRange.next) {
+ if (curRange.getOffset() <= desired &&
+ (curRange.next == null ? desired <= curRange.getEnd() :
+ desired < curRange.getEnd())) {
+ currentOffset = desired;
+ setCurrent(curRange, true);
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Seek in " + name + " to " +
+ desired + " is outside of the data");
}
- throw new IllegalArgumentException("Seek in " + name + " to " +
- desired + " is outside of the data");
}
@Override
public String toString() {
return "uncompressed stream " + name + " position: " + currentOffset +
- " length: " + length + " range: " + currentRange +
- " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
+ " length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
+ " offset: " + (decrypted == null ? 0 : decrypted.position()) +
+ " limit: " + (decrypted == null ? 0 : decrypted.limit());
}
}
@@ -173,33 +205,208 @@ public abstract class InStream extends InputStream {
}
}
+ /**
+ * Manage the state of the decryption, including the ability to seek.
+ */
+ static class EncryptionState {
+ private final String name;
+ private final EncryptionAlgorithm algorithm;
+ private final Key key;
+ private final byte[] iv;
+ private final Cipher cipher;
+ private ByteBuffer decrypted;
+
+ EncryptionState(String name, StreamOptions options) {
+ this.name = name;
+ algorithm = options.algorithm;
+ key = options.key;
+ iv = options.iv;
+ cipher = algorithm.createCipher();
+ }
+
+ /**
+ * We are seeking to a new range, so update the cipher to change the IV
+ * to match. This code assumes that we only support encryption in CTR mode.
+ * @param offset where we are seeking to in the stream
+ */
+ void changeIv(long offset) {
+ int blockSize = cipher.getBlockSize();
+ long encryptionBlocks = offset / blockSize;
+ long extra = offset % blockSize;
+ byte[] advancedIv;
+ if (encryptionBlocks == 0) {
+ advancedIv = iv;
+ } else {
+ // Add the encryption blocks into the initial iv, to compensate for
+ // skipping over decrypting those bytes.
+ advancedIv = new byte[iv.length];
+ System.arraycopy(iv, 0, advancedIv, 0, iv.length);
+ int posn = iv.length - 1;
+ while (encryptionBlocks > 0) {
+ long sum = (advancedIv[posn] & 0xff) + encryptionBlocks;
+ advancedIv[posn--] = (byte) sum;
+ encryptionBlocks = sum / 0x100;
+ }
+ }
+ try {
+ cipher.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(advancedIv));
+ // If the range starts at an offset that doesn't match the encryption
+ // block, we need to advance some bytes within an encryption block.
+ if (extra > 0) {
+ byte[] wasted = new byte[(int) extra];
+ cipher.update(wasted, 0, wasted.length, wasted, 0);
+ }
+ } catch (InvalidKeyException e) {
+ throw new IllegalArgumentException("Invalid key on " + name, e);
+ } catch (InvalidAlgorithmParameterException e) {
+ throw new IllegalArgumentException("Invalid iv on " + name, e);
+ } catch (ShortBufferException e) {
+ throw new IllegalArgumentException("Short buffer in " + name, e);
+ }
+ }
+
+ /**
+ * Decrypt the given range into the decrypted buffer. It is assumed that
+ * the cipher is correctly initialized by changeIv before this is called.
+ * @param newRange the range to decrypte
+ * @return a reused ByteBuffer, which is used by each call to decrypt
+ */
+ ByteBuffer decrypt(DiskRangeList newRange) {
+ final long offset = newRange.getOffset();
+ final int length = newRange.getLength();
+ if (decrypted == null || decrypted.capacity() < length) {
+ decrypted = ByteBuffer.allocate(length);
+ } else {
+ decrypted.clear();
+ }
+ ByteBuffer encrypted = newRange.getData().duplicate();
+ try {
+ int output = cipher.update(encrypted, decrypted);
+ if (output != length) {
+ throw new IllegalArgumentException("Problem decrypting " + name +
+ " at " + offset);
+ }
+ } catch (ShortBufferException e) {
+ throw new IllegalArgumentException("Problem decrypting " + name +
+ " at " + offset, e);
+ }
+ decrypted.flip();
+ return decrypted;
+ }
+
+ void close() {
+ decrypted = null;
+ }
+ }
+
+ /**
+ * Implements a stream over an encrypted, but uncompressed stream.
+ */
+ public static class EncryptedStream extends UncompressedStream {
+ private final EncryptionState encrypt;
+
+ public EncryptedStream(String name, DiskRangeList input, long length,
+ StreamOptions options) {
+ super(name, length);
+ encrypt = new EncryptionState(name, options);
+ reset(input, length);
+ }
+
+ @Override
+ protected void setCurrent(DiskRangeList newRange, boolean isJump) {
+ currentRange = newRange;
+ if (newRange != null) {
+ if (isJump) {
+ encrypt.changeIv(newRange.getOffset());
+ }
+ decrypted = encrypt.decrypt(newRange);
+ decrypted.position((int) (currentOffset - newRange.getOffset()));
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ encrypt.close();
+ }
+
+ @Override
+ public String toString() {
+ return "encrypted " + super.toString();
+ }
+ }
+
private static class CompressedStream extends InStream {
- private final List<DiskRange> bytes;
+ private DiskRangeList bytes;
private final int bufferSize;
private ByteBuffer uncompressed;
private final CompressionCodec codec;
- private ByteBuffer compressed;
- private long currentOffset;
- private int currentRange;
+ protected ByteBuffer compressed;
+ protected long currentOffset;
+ protected DiskRangeList currentRange;
private boolean isUncompressedOriginal;
- public CompressedStream(String name, List<DiskRange> input, long length,
- CompressionCodec codec, int bufferSize) {
+ /**
+ * Create the stream without resetting the input stream.
+ * This is used in subclasses so they can finish initializing before
+ * reset is called.
+ * @param name the name of the stream
+ * @param length the total number of bytes in the stream
+ * @param options the options used to read the stream
+ */
+ public CompressedStream(String name,
+ long length,
+ StreamOptions options) {
super(name, length);
- this.bytes = input;
- this.codec = codec;
- this.bufferSize = bufferSize;
- currentOffset = 0;
- currentRange = 0;
+ this.codec = options.codec;
+ this.bufferSize = options.bufferSize;
+ }
+
+ /**
+ * Create the stream and initialize the input for the stream.
+ * @param name the name of the stream
+ * @param input the input data
+ * @param length the total length of the stream
+ * @param options the options to read the data with
+ */
+ public CompressedStream(String name,
+ DiskRangeList input,
+ long length,
+ StreamOptions options) {
+ super(name, length);
+ this.codec = options.codec;
+ this.bufferSize = options.bufferSize;
+ reset(input, length);
+ }
+
+ /**
+ * Reset the input to a new set of data.
+ * @param input the input data
+ * @param length the number of bytes in the stream
+ */
+ void reset(DiskRangeList input, long length) {
+ bytes = input;
+ this.length = length;
+ currentOffset = input == null ? 0 : input.getOffset();
+ setCurrent(input, true);
}
private void allocateForUncompressed(int size, boolean isDirect) {
uncompressed = allocateBuffer(size, isDirect);
}
+ protected void setCurrent(DiskRangeList newRange,
+ boolean isJump) {
+ currentRange = newRange;
+ if (newRange != null) {
+ compressed = newRange.getData().slice();
+ compressed.position((int) (currentOffset - newRange.getOffset()));
+ }
+ }
+
private void readHeader() throws IOException {
if (compressed == null || compressed.remaining() <= 0) {
- seek(currentOffset);
+ setCurrent(currentRange.next, false);
}
if (compressed.remaining() > OutStream.HEADER_SIZE) {
int b0 = compressed.get() & 0xff;
@@ -277,9 +484,9 @@ public abstract class InStream extends InputStream {
public void close() {
uncompressed = null;
compressed = null;
- currentRange = bytes.size();
+ currentRange = null;
currentOffset = length;
- bytes.clear();
+ bytes = null;
}
@Override
@@ -299,6 +506,7 @@ public abstract class InStream extends InputStream {
/* slices a read only contiguous buffer of chunkLength */
private ByteBuffer slice(int chunkLength) throws IOException {
int len = chunkLength;
+ final DiskRangeList oldRange = currentRange;
final long oldOffset = currentOffset;
ByteBuffer slice;
if (compressed.remaining() >= len) {
@@ -308,7 +516,7 @@ public abstract class InStream extends InputStream {
currentOffset += len;
compressed.position(compressed.position() + len);
return slice;
- } else if (currentRange >= (bytes.size() - 1)) {
+ } else if (currentRange.next == null) {
// nothing has been modified yet
throw new IOException("EOF in " + this + " while trying to read " +
chunkLength + " bytes");
@@ -326,21 +534,19 @@ public abstract class InStream extends InputStream {
currentOffset += compressed.remaining();
len -= compressed.remaining();
copy.put(compressed);
- ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
- while (len > 0 && iter.hasNext()) {
- ++currentRange;
+ while (currentRange.next != null) {
+ setCurrent(currentRange.next, false);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
}
- DiskRange range = iter.next();
- compressed = range.getData().duplicate();
if (compressed.remaining() >= len) {
slice = compressed.slice();
slice.limit(len);
copy.put(slice);
currentOffset += len;
compressed.position(compressed.position() + len);
+ copy.flip();
return copy;
}
currentOffset += compressed.remaining();
@@ -349,37 +555,24 @@ public abstract class InStream extends InputStream {
}
// restore offsets for exception clarity
- seek(oldOffset);
+ currentOffset = oldOffset;
+ setCurrent(oldRange, true);
throw new IOException("EOF in " + this + " while trying to read " +
chunkLength + " bytes");
}
- private void seek(long desired) throws IOException {
- if (desired == 0 && bytes.isEmpty()) {
+ void seek(long desired) throws IOException {
+ if (desired == 0 && bytes == null) {
return;
}
- int i = 0;
- for (DiskRange range : bytes) {
- if (range.getOffset() <= desired && desired < range.getEnd()) {
- currentRange = i;
- compressed = range.getData().duplicate();
- int pos = compressed.position();
- pos += (int)(desired - range.getOffset());
- compressed.position(pos);
+ for (DiskRangeList range = bytes; range != null; range = range.next) {
+ if (range.getOffset() <= desired &&
+ (range.next == null ? desired <= range.getEnd() :
+ desired < range.getEnd())) {
currentOffset = desired;
+ setCurrent(range, true);
return;
}
- ++i;
- }
- // if they are seeking to the precise end, go ahead and let them go there
- int segments = bytes.size();
- if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
- DiskRange range = bytes.get(segments - 1);
- currentRange = segments - 1;
- compressed = range.getData().duplicate();
- compressed.position(compressed.limit());
- currentOffset = desired;
- return;
}
throw new IOException("Seek outside of data in " + this + " to " + desired);
}
@@ -387,12 +580,16 @@ public abstract class InStream extends InputStream {
private String rangeString() {
StringBuilder builder = new StringBuilder();
int i = 0;
- for (DiskRange range : bytes) {
+ for (DiskRangeList range = bytes; range != null; range = range.next){
if (i != 0) {
builder.append("; ");
}
- builder.append(" range " + i + " = " + range.getOffset()
- + " to " + (range.getEnd() - range.getOffset()));
+ builder.append(" range ");
+ builder.append(i);
+ builder.append(" = ");
+ builder.append(range.getOffset());
+ builder.append(" to ");
+ builder.append(range.getEnd());
++i;
}
return builder.toString();
@@ -401,8 +598,9 @@ public abstract class InStream extends InputStream {
@Override
public String toString() {
return "compressed stream " + name + " position: " + currentOffset +
- " length: " + length + " range: " + currentRange +
- " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
+ " length: " + length + " range: " + getRangeNumber(bytes, currentRange) +
+ " offset: " + (compressed == null ? 0 : compressed.position()) +
+ " limit: " + (compressed == null ? 0 : compressed.limit()) +
rangeString() +
(uncompressed == null ? "" :
" uncompressed: " + uncompressed.position() + " to " +
@@ -410,33 +608,116 @@ public abstract class InStream extends InputStream {
}
}
+ private static class EncryptedCompressedStream extends CompressedStream {
+ private final EncryptionState encrypt;
+
+ public EncryptedCompressedStream(String name,
+ DiskRangeList input,
+ long length,
+ StreamOptions options) {
+ super(name, length, options);
+ encrypt = new EncryptionState(name, options);
+ reset(input, length);
+ }
+
+ @Override
+ protected void setCurrent(DiskRangeList newRange, boolean isJump) {
+ currentRange = newRange;
+ if (newRange != null) {
+ if (isJump) {
+ encrypt.changeIv(newRange.getOffset());
+ }
+ compressed = encrypt.decrypt(newRange);
+ compressed.position((int) (currentOffset - newRange.getOffset()));
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ encrypt.close();
+ }
+
+ @Override
+ public String toString() {
+ return "encrypted " + super.toString();
+ }
+ }
+
public abstract void seek(PositionProvider index) throws IOException;
+ public static class StreamOptions implements Cloneable {
+ private CompressionCodec codec;
+ private int bufferSize;
+ private EncryptionAlgorithm algorithm;
+ private Key key;
+ private byte[] iv;
+
+ public StreamOptions withCodec(CompressionCodec value) {
+ this.codec = value;
+ return this;
+ }
+
+ public StreamOptions withBufferSize(int value) {
+ bufferSize = value;
+ return this;
+ }
+
+ public StreamOptions withEncryption(EncryptionAlgorithm algorithm,
+ Key key,
+ byte[] iv) {
+ this.algorithm = algorithm;
+ this.key = key;
+ this.iv = iv;
+ return this;
+ }
+
+ public CompressionCodec getCodec() {
+ return codec;
+ }
+
+ @Override
+ public StreamOptions clone() {
+ try {
+ StreamOptions clone = (StreamOptions) super.clone();
+ if (clone.codec != null) {
+ // Make sure we don't share the same codec between two readers.
+ clone.codec = OrcCodecPool.getCodec(codec.getKind());
+ }
+ return clone;
+ } catch (CloneNotSupportedException e) {
+ throw new UnsupportedOperationException("uncloneable", e);
+ }
+ }
+ }
+
+ public static StreamOptions options() {
+ return new StreamOptions();
+ }
+
/**
- * Create an input stream from a list of buffers.
- * @param streamName the name of the stream
- * @param buffers the list of ranges of bytes for the stream
- * @param offsets a list of offsets (the same length as input) that must
- * contain the first offset of the each set of bytes in input
+ * Create an input stream from a list of disk ranges with data.
+ * @param name the name of the stream
+ * @param input the list of ranges of bytes for the stream; from disk or cache
* @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
+ * @param options the options to read with
* @return an input stream
- * @throws IOException
*/
- //@VisibleForTesting
- @Deprecated
- public static InStream create(String streamName,
- ByteBuffer[] buffers,
- long[] offsets,
+ public static InStream create(String name,
+ DiskRangeList input,
long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
- for (int i = 0; i < buffers.length; ++i) {
- input.add(new BufferChunk(buffers[i], offsets[i]));
+ StreamOptions options) {
+ if (options == null || options.codec == null) {
+ if (options == null || options.key == null) {
+ return new UncompressedStream(name, input, length);
+ } else {
+ return new EncryptedStream(name, input, length, options);
+ }
+ } else if (options.key == null) {
+ return new CompressedStream(name, input, length, options);
+ } else {
+ return new EncryptedCompressedStream(name, input, length, options);
}
- return create(streamName, input, length, codec, bufferSize);
}
/**
@@ -444,41 +725,22 @@ public abstract class InStream extends InputStream {
* @param name the name of the stream
* @param input the list of ranges of bytes for the stream; from disk or cache
* @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
* @return an input stream
- * @throws IOException
*/
public static InStream create(String name,
- List<DiskRange> input,
- long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- if (codec == null) {
- return new UncompressedStream(name, input, length);
- } else {
- return new CompressedStream(name, input, length, codec, bufferSize);
- }
+ DiskRangeList input,
+ long length) throws IOException {
+ return create(name, input, length, null);
}
/**
- * Creates coded input stream (used for protobuf message parsing) with higher message size limit.
+ * Creates coded input stream (used for protobuf message parsing) with higher
+ * message size limit.
*
- * @param name the name of the stream
- * @param input the list of ranges of bytes for the stream; from disk or cache
- * @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
+ * @param inStream the stream to wrap.
* @return coded input stream
- * @throws IOException
*/
- public static CodedInputStream createCodedInputStream(
- String name,
- List<DiskRange> input,
- long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- InStream inStream = create(name, input, length, codec, bufferSize);
+ public static CodedInputStream createCodedInputStream(InStream inStream) {
CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream);
codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT);
return codedInputStream;
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/OrcTail.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcTail.java b/java/core/src/java/org/apache/orc/impl/OrcTail.java
index 3c78874..9e8a5f2 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcTail.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcTail.java
@@ -109,7 +109,8 @@ public final class OrcTail {
CompressionCodec codec = OrcCodecPool.getCodec(getCompressionKind());
try {
metadata = extractMetadata(serializedTail, 0,
- (int) fileTail.getPostscript().getMetadataLength(), codec, getCompressionBufferSize());
+ (int) fileTail.getPostscript().getMetadataLength(),
+ InStream.options().withCodec(codec).withBufferSize(getCompressionBufferSize()));
} finally {
OrcCodecPool.returnCodec(getCompressionKind(), codec);
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java
index d6302d5..435f43c 100644
--- a/java/core/src/java/org/apache/orc/impl/OutStream.java
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,13 +18,26 @@
package org.apache.orc.impl;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
import org.apache.orc.PhysicalWriter;
+import org.apache.orc.impl.writer.StreamOptions;
+import javax.crypto.BadPaddingException;
+import javax.crypto.Cipher;
+import javax.crypto.IllegalBlockSizeException;
+import javax.crypto.ShortBufferException;
+import javax.crypto.spec.IvParameterSpec;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.Key;
+/**
+ * The output stream for writing to ORC files.
+ * It handles both compression and encryption.
+ */
public class OutStream extends PositionedOutputStream {
-
public static final int HEADER_SIZE = 3;
private final String name;
private final PhysicalWriter.OutputReceiver receiver;
@@ -55,19 +68,89 @@ public class OutStream extends PositionedOutputStream {
private final CompressionCodec codec;
private long compressedBytes = 0;
private long uncompressedBytes = 0;
+ private final Cipher cipher;
+ private final Key key;
public OutStream(String name,
int bufferSize,
CompressionCodec codec,
- PhysicalWriter.OutputReceiver receiver) throws IOException {
+ PhysicalWriter.OutputReceiver receiver) {
+ this(name, new StreamOptions(bufferSize).withCodec(codec), receiver);
+ }
+
+ public OutStream(String name,
+ StreamOptions options,
+ PhysicalWriter.OutputReceiver receiver) {
this.name = name;
- this.bufferSize = bufferSize;
- this.codec = codec;
+ this.bufferSize = options.getBufferSize();
+ this.codec = options.getCodec();
this.receiver = receiver;
+ if (options.isEncrypted()) {
+ this.cipher = options.getAlgorithm().createCipher();
+ this.key = options.getKey();
+ changeIv(options.getIv());
+ } else {
+ this.cipher = null;
+ this.key = null;
+ }
}
- public void clear() throws IOException {
- flush();
+ /**
+ * Change the current Initialization Vector (IV) for the encryption.
+ */
+ void changeIv(byte[] newIv) {
+ try {
+ cipher.init(Cipher.ENCRYPT_MODE, key, new IvParameterSpec(newIv));
+ } catch (InvalidKeyException e) {
+ throw new IllegalStateException("ORC bad encryption key for " +
+ toString(), e);
+ } catch (InvalidAlgorithmParameterException e) {
+ throw new IllegalStateException("ORC bad encryption parameter for " +
+ toString(), e);
+ }
+ }
+
+ /**
+ * When a buffer is done, we send it to the receiver to store.
+ * If we are encrypting, encrypt the buffer before we pass it on.
+ * @param buffer the buffer to store
+ */
+ void outputBuffer(ByteBuffer buffer) throws IOException {
+ if (cipher != null) {
+ ByteBuffer output = buffer.duplicate();
+ int len = buffer.remaining();
+ try {
+ int encrypted = cipher.update(buffer, output);
+ output.flip();
+ receiver.output(output);
+ if (encrypted != len) {
+ throw new IllegalArgumentException("Encryption of incomplete buffer "
+ + len + " -> " + encrypted + " in " + toString());
+ }
+ } catch (ShortBufferException e) {
+ throw new IOException("Short buffer in encryption in " + toString(), e);
+ }
+ } else {
+ receiver.output(buffer);
+ }
+ }
+
+ /**
+ * Ensure that the cipher didn't save any data.
+ * The next call should be to changeIv to restart the encryption on a new IV.
+ */
+ void finishEncryption() {
+ try {
+ byte[] finalBytes = cipher.doFinal();
+ if (finalBytes != null && finalBytes.length != 0) {
+ throw new IllegalStateException("We shouldn't have remaining bytes " +
+ toString());
+ }
+ } catch (IllegalBlockSizeException e) {
+ throw new IllegalArgumentException("Bad block size", e);
+ } catch (BadPaddingException e) {
+ throw new IllegalArgumentException("Bad padding", e);
+ }
}
/**
@@ -90,7 +173,7 @@ public class OutStream extends PositionedOutputStream {
buffer.put(position + 2, (byte) (val >> 15));
}
- private void getNewInputBuffer() throws IOException {
+ private void getNewInputBuffer() {
if (codec == null) {
current = ByteBuffer.allocate(bufferSize);
} else {
@@ -117,11 +200,11 @@ public class OutStream extends PositionedOutputStream {
/**
* Allocate a new output buffer if we are compressing.
*/
- private ByteBuffer getNewOutputBuffer() throws IOException {
+ private ByteBuffer getNewOutputBuffer() {
return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
}
- private void flip() throws IOException {
+ private void flip() {
current.limit(current.position());
current.position(codec == null ? 0 : HEADER_SIZE);
}
@@ -165,7 +248,7 @@ public class OutStream extends PositionedOutputStream {
}
flip();
if (codec == null) {
- receiver.output(current);
+ outputBuffer(current);
getNewInputBuffer();
} else {
if (compressed == null) {
@@ -190,7 +273,7 @@ public class OutStream extends PositionedOutputStream {
// if we have less than the next header left, spill it.
if (compressed.remaining() < HEADER_SIZE) {
compressed.flip();
- receiver.output(compressed);
+ outputBuffer(compressed);
compressed = overflow;
overflow = null;
}
@@ -203,7 +286,7 @@ public class OutStream extends PositionedOutputStream {
if (sizePosn != 0) {
compressed.position(sizePosn);
compressed.flip();
- receiver.output(compressed);
+ outputBuffer(compressed);
compressed = null;
// if we have an overflow, clear it and make it the new compress
// buffer
@@ -223,14 +306,14 @@ public class OutStream extends PositionedOutputStream {
current.position(0);
// update the header with the current length
writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
- receiver.output(current);
+ outputBuffer(current);
getNewInputBuffer();
}
}
}
@Override
- public void getPosition(PositionRecorder recorder) throws IOException {
+ public void getPosition(PositionRecorder recorder) {
if (codec == null) {
recorder.addPosition(uncompressedBytes);
} else {
@@ -244,7 +327,10 @@ public class OutStream extends PositionedOutputStream {
spill();
if (compressed != null && compressed.position() != 0) {
compressed.flip();
- receiver.output(compressed);
+ outputBuffer(compressed);
+ }
+ if (cipher != null) {
+ finishEncryption();
}
compressed = null;
uncompressedBytes = 0;
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index bba580f..3919988 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.io.Text;
import org.apache.orc.OrcProto;
@@ -275,31 +274,6 @@ public class ReaderImpl implements Reader {
}
/**
- * Ensure this is an ORC file to prevent users from trying to read text
- * files or RC files as ORC files.
- * @param psLen the postscript length
- * @param buffer the tail of the file
- */
- protected static void ensureOrcFooter(ByteBuffer buffer, int psLen) throws IOException {
- int magicLength = OrcFile.MAGIC.length();
- int fullLength = magicLength + 1;
- if (psLen < fullLength || buffer.remaining() < fullLength) {
- throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen);
- }
-
- int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength;
- byte[] array = buffer.array();
- // now look for the magic string at the end of the postscript.
- if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) {
- // if it isn't there, this may be 0.11.0 version of the ORC file.
- // Read the first 3 bytes from the buffer to check for the header
- if (!Text.decode(buffer.array(), 0, magicLength).equals(OrcFile.MAGIC)) {
- throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen);
- }
- }
- }
-
- /**
* Build a version string out of an array.
* @param version the version number as a list
* @return the human readable form of the version string
@@ -405,26 +379,20 @@ public class ReaderImpl implements Reader {
return OrcFile.WriterVersion.FUTURE;
}
- static List<DiskRange> singleton(DiskRange item) {
- List<DiskRange> result = new ArrayList<>();
- result.add(item);
- return result;
- }
-
private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
- int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
+ int footerSize, InStream.StreamOptions options) throws IOException {
bb.position(footerAbsPos);
bb.limit(footerAbsPos + footerSize);
- return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer",
- singleton(new BufferChunk(bb, 0)), footerSize, codec, bufferSize));
+ return OrcProto.Footer.parseFrom(InStream.createCodedInputStream(
+ InStream.create("footer", new BufferChunk(bb, 0), footerSize, options)));
}
public static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
- int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
+ int metadataSize, InStream.StreamOptions options) throws IOException {
bb.position(metadataAbsPos);
bb.limit(metadataAbsPos + metadataSize);
- return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata",
- singleton(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize));
+ return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream(
+ InStream.create("metadata", new BufferChunk(bb, 0), metadataSize, options)));
}
private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
@@ -450,41 +418,6 @@ public class ReaderImpl implements Reader {
return ps;
}
- public static OrcTail extractFileTail(ByteBuffer buffer)
- throws IOException {
- return extractFileTail(buffer, -1, -1);
- }
-
- public static OrcTail extractFileTail(ByteBuffer buffer, long fileLength, long modificationTime)
- throws IOException {
- int readSize = buffer.limit();
- int psLen = buffer.get(readSize - 1) & 0xff;
- int psOffset = readSize - 1 - psLen;
- ensureOrcFooter(buffer, psLen);
- byte[] psBuffer = new byte[psLen];
- System.arraycopy(buffer.array(), psOffset, psBuffer, 0, psLen);
- OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(psBuffer);
- int footerSize = (int) ps.getFooterLength();
- CompressionKind kind = CompressionKind.valueOf(ps.getCompression().name());
- OrcProto.FileTail.Builder fileTailBuilder;
- CompressionCodec codec = OrcCodecPool.getCodec(kind);
- try {
- OrcProto.Footer footer = extractFooter(buffer,
- (int) (buffer.position() + ps.getMetadataLength()),
- footerSize, codec, (int) ps.getCompressionBlockSize());
- fileTailBuilder = OrcProto.FileTail.newBuilder()
- .setPostscriptLength(psLen)
- .setPostscript(ps)
- .setFooter(footer)
- .setFileLength(fileLength);
- } finally {
- OrcCodecPool.returnCodec(kind, codec);
- }
- // clear does not clear the contents but sets position to 0 and limit = capacity
- buffer.clear();
- return new OrcTail(fileTailBuilder.build(), buffer.slice(), modificationTime);
- }
-
/**
* Build a virtual OrcTail for empty files.
* @return a new OrcTail
@@ -599,7 +532,8 @@ public class ReaderImpl implements Reader {
OrcProto.Footer footer;
CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
try {
- footer = extractFooter(footerBuffer, 0, footerSize, codec, bufferSize);
+ footer = extractFooter(footerBuffer, 0, footerSize,
+ InStream.options().withCodec(codec).withBufferSize(bufferSize));
} finally {
OrcCodecPool.returnCodec(compressionKind, codec);
}
@@ -788,7 +722,8 @@ public class ReaderImpl implements Reader {
if (metadata == null) {
CompressionCodec codec = OrcCodecPool.getCodec(compressionKind);
try {
- metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize);
+ metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize,
+ InStream.options().withCodec(codec).withBufferSize(bufferSize));
} finally {
OrcCodecPool.returnCodec(compressionKind, codec);
}
@@ -803,10 +738,6 @@ public class ReaderImpl implements Reader {
return result;
}
- public List<OrcProto.UserMetadataItem> getOrcProtoUserMetadata() {
- return userMetadata;
- }
-
@Override
public List<Integer> getVersionList() {
return versionList;
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 8bb4d9a..a8e0be1 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -1160,6 +1160,8 @@ public class RecordReaderImpl implements RecordReader {
int bufferSize,
Map<StreamName, InStream> streams) throws IOException {
long streamOffset = 0;
+ InStream.StreamOptions options = InStream.options().withCodec(codec)
+ .withBufferSize(bufferSize);
for (OrcProto.Stream streamDesc : streamDescriptions) {
int column = streamDesc.getColumn();
if ((includeColumn != null &&
@@ -1169,11 +1171,11 @@ public class RecordReaderImpl implements RecordReader {
streamOffset += streamDesc.getLength();
continue;
}
- List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
+ DiskRangeList buffers = RecordReaderUtils.getStreamBuffers(
ranges, streamOffset, streamDesc.getLength());
StreamName name = new StreamName(column, streamDesc.getKind());
streams.put(name, InStream.create(name.toString(), buffers,
- streamDesc.getLength(), codec, bufferSize));
+ streamDesc.getLength(), options));
streamOffset += streamDesc.getLength();
}
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 705e768..a70c988 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -150,8 +150,7 @@ public class RecordReaderUtils {
private final FileSystem fs;
private final Path path;
private final boolean useZeroCopy;
- private CompressionCodec codec;
- private final int bufferSize;
+ private InStream.StreamOptions options = InStream.options();
private final int typeCount;
private CompressionKind compressionKind;
@@ -160,8 +159,8 @@ public class RecordReaderUtils {
this.path = properties.getPath();
this.useZeroCopy = properties.getZeroCopy();
this.compressionKind = properties.getCompression();
- this.codec = OrcCodecPool.getCodec(compressionKind);
- this.bufferSize = properties.getBufferSize();
+ options.withCodec(OrcCodecPool.getCodec(compressionKind))
+ .withBufferSize(properties.getBufferSize());
this.typeCount = properties.getTypeCount();
}
@@ -171,7 +170,7 @@ public class RecordReaderUtils {
if (useZeroCopy) {
// ZCR only uses codec for boolean checks.
pool = new ByteBufferAllocatorPool();
- zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
+ zcr = RecordReaderUtils.createZeroCopyShim(file, options.getCodec(), pool);
} else {
zcr = null;
}
@@ -228,9 +227,9 @@ public class RecordReaderUtils {
bb.position((int) (offset - range.getOffset()));
bb.limit((int) (bb.position() + stream.getLength()));
indexes[column] = OrcProto.RowIndex.parseFrom(
- InStream.createCodedInputStream("index",
- ReaderImpl.singleton(new BufferChunk(bb, 0)),
- stream.getLength(), codec, bufferSize));
+ InStream.createCodedInputStream(InStream.create("index",
+ new BufferChunk(bb, 0),
+ stream.getLength(), options)));
}
break;
case BLOOM_FILTER:
@@ -240,9 +239,9 @@ public class RecordReaderUtils {
bb.position((int) (offset - range.getOffset()));
bb.limit((int) (bb.position() + stream.getLength()));
bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom
- (InStream.createCodedInputStream("bloom_filter",
- ReaderImpl.singleton(new BufferChunk(bb, 0)),
- stream.getLength(), codec, bufferSize));
+ (InStream.createCodedInputStream(InStream.create(
+ "bloom_filter", new BufferChunk(bb, 0),
+ stream.getLength(), options)));
}
break;
default:
@@ -266,8 +265,8 @@ public class RecordReaderUtils {
ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
return OrcProto.StripeFooter.parseFrom(
- InStream.createCodedInputStream("footer", ReaderImpl.singleton(
- new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize));
+ InStream.createCodedInputStream(InStream.create("footer",
+ new BufferChunk(tailBuf, 0), tailLength, options)));
}
@Override
@@ -278,9 +277,9 @@ public class RecordReaderUtils {
@Override
public void close() throws IOException {
- if (codec != null) {
- OrcCodecPool.returnCodec(compressionKind, codec);
- codec = null;
+ if (options.getCodec() != null) {
+ OrcCodecPool.returnCodec(compressionKind, options.getCodec());
+ options.withCodec(null);
}
if (pool != null) {
pool.clear();
@@ -313,9 +312,9 @@ public class RecordReaderUtils {
}
try {
DefaultDataReader clone = (DefaultDataReader) super.clone();
- if (codec != null) {
+ if (options.getCodec() != null) {
// Make sure we don't share the same codec between two readers.
- clone.codec = OrcCodecPool.getCodec(clone.compressionKind);
+ clone.options = options.clone();
}
return clone;
} catch (CloneNotSupportedException e) {
@@ -325,7 +324,7 @@ public class RecordReaderUtils {
@Override
public CompressionCodec getCompressionCodec() {
- return codec;
+ return options.getCodec();
}
}
@@ -572,42 +571,49 @@ public class RecordReaderUtils {
}
- static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) {
+ static DiskRangeList getStreamBuffers(DiskRangeList range, long offset,
+ long length) {
// This assumes sorted ranges (as do many other parts of ORC code.
- ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
- if (length == 0) return buffers;
- long streamEnd = offset + length;
- boolean inRange = false;
- while (range != null) {
- if (!inRange) {
- if (range.getEnd() <= offset) {
- range = range.next;
- continue; // Skip until we are in range.
+ BufferChunkList result = new BufferChunkList();
+ if (length != 0) {
+ long streamEnd = offset + length;
+ boolean inRange = false;
+ while (range != null) {
+ if (!inRange) {
+ if (range.getEnd() <= offset) {
+ range = range.next;
+ continue; // Skip until we are in range.
+ }
+ inRange = true;
+ if (range.getOffset() < offset) {
+ // Partial first buffer, add a slice of it.
+ result.add((BufferChunk) range.sliceAndShift(offset,
+ Math.min(streamEnd, range.getEnd()), -offset));
+ if (range.getEnd() >= streamEnd)
+ break; // Partial first buffer is also partial last buffer.
+ range = range.next;
+ continue;
+ }
+ } else if (range.getOffset() >= streamEnd) {
+ break;
}
- inRange = true;
- if (range.getOffset() < offset) {
- // Partial first buffer, add a slice of it.
- buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset));
- if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer.
- range = range.next;
- continue;
+ if (range.getEnd() > streamEnd) {
+ // Partial last buffer (may also be the first buffer), add a slice of it.
+ result.add((BufferChunk) range.sliceAndShift(range.getOffset(),
+ streamEnd, -offset));
+ break;
}
- } else if (range.getOffset() >= streamEnd) {
- break;
- }
- if (range.getEnd() > streamEnd) {
- // Partial last buffer (may also be the first buffer), add a slice of it.
- buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset));
- break;
+ // Buffer that belongs entirely to one stream.
+ // TODO: ideally we would want to reuse the object and remove it from
+ // the list, but we cannot because bufferChunks is also used by
+ // clearStreams for zcr. Create a useless dup.
+ result.add((BufferChunk) range.sliceAndShift(range.getOffset(),
+ range.getEnd(), -offset));
+ if (range.getEnd() == streamEnd) break;
+ range = range.next;
}
- // Buffer that belongs entirely to one stream.
- // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
- // because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
- buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset));
- if (range.getEnd() == streamEnd) break;
- range = range.next;
}
- return buffers;
+ return result.get();
}
static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file,
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java b/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java
deleted file mode 100644
index da92c62..0000000
--- a/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.common.DiskRangeInfo;
-import org.apache.hadoop.hive.common.io.DiskRange;
-
-/**
- * An uncompressed stream whose underlying byte buffer can be set.
- */
-public class SettableUncompressedStream extends InStream.UncompressedStream {
-
- public SettableUncompressedStream(String name, List<DiskRange> input, long length) {
- super(name, input, length);
- setOffset(input);
- }
-
- public void setBuffers(DiskRangeInfo diskRangeInfo) {
- reset(diskRangeInfo.getDiskRanges(), diskRangeInfo.getTotalLength());
- setOffset(diskRangeInfo.getDiskRanges());
- }
-
- private void setOffset(List<DiskRange> list) {
- currentOffset = list.isEmpty() ? 0 : list.get(0).getOffset();
- }
-}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
index 452f315..9269eb6 100644
--- a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java
@@ -20,6 +20,7 @@ package org.apache.orc.impl;
import io.airlift.compress.snappy.SnappyCompressor;
import io.airlift.compress.snappy.SnappyDecompressor;
+import org.apache.orc.CompressionKind;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -32,7 +33,7 @@ public class SnappyCodec extends AircompressorCodec
HadoopShims.DirectDecompressor decompressShim = null;
SnappyCodec() {
- super(new SnappyCompressor(), new SnappyDecompressor());
+ super(CompressionKind.SNAPPY, new SnappyCompressor(), new SnappyDecompressor());
}
@Override
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index d6239f2..4c3c548 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -236,10 +236,10 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
case SNAPPY:
return new SnappyCodec();
case LZO:
- return new AircompressorCodec(new LzoCompressor(),
+ return new AircompressorCodec(kind, new LzoCompressor(),
new LzoDecompressor());
case LZ4:
- return new AircompressorCodec(new Lz4Compressor(),
+ return new AircompressorCodec(kind, new Lz4Compressor(),
new Lz4Decompressor());
default:
throw new IllegalArgumentException("Unknown compression codec: " +
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
index de1a6bf..19a3728 100644
--- a/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
+++ b/java/core/src/java/org/apache/orc/impl/ZlibCodec.java
@@ -25,6 +25,7 @@ import java.util.zip.Deflater;
import java.util.zip.Inflater;
import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
private static final HadoopShims SHIMS = HadoopShimsFactory.get();
@@ -187,4 +188,9 @@ public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
decompressShim.end();
}
}
+
+ @Override
+ public CompressionKind getKind() {
+ return CompressionKind.ZLIB;
+ }
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java b/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
new file mode 100644
index 0000000..3d0c48a
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StreamOptions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl.writer;
+
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.EncryptionAlgorithm;
+
+import java.security.Key;
+
+/**
+ * The compression and encryption options for writing a stream.
+ */
+public class StreamOptions {
+ private CompressionCodec codec;
+ private final int bufferSize;
+ private EncryptionAlgorithm algorithm;
+ private Key key;
+ private byte[] iv;
+
+ /**
+ * An option object with the given buffer size set.
+ * @param bufferSize the size of the buffers.
+ */
+ public StreamOptions(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ /**
+ * Compress using the given codec.
+ * @param codec the codec to compress with
+ * @return this
+ */
+ public StreamOptions withCodec(CompressionCodec codec) {
+ this.codec = codec;
+ return this;
+ }
+
+ public StreamOptions withEncryption(EncryptionAlgorithm algorithm,
+ Key key,
+ byte[] iv) {
+ this.algorithm = algorithm;
+ this.key = key;
+ this.iv = iv;
+ return this;
+ }
+
+ public CompressionCodec getCodec() {
+ return codec;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public boolean isEncrypted() {
+ return key != null;
+ }
+
+ public Key getKey() {
+ return key;
+ }
+
+ public EncryptionAlgorithm getAlgorithm() {
+ return algorithm;
+ }
+
+ public byte[] getIv() {
+ return iv;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
index f7a2a5c..03c379e 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitFieldReader.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -48,8 +48,8 @@ public class TestBitFieldReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
BitFieldReader in = new BitFieldReader(InStream.create("test",
- new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
- codec, 500));
+ new BufferChunk(inBuf, 0), inBuf.remaining(),
+ InStream.options().withCodec(codec).withBufferSize(500)));
for(int i=0; i < COUNT; ++i) {
int x = in.next();
if (i < COUNT / 2) {
@@ -96,8 +96,8 @@ public class TestBitFieldReader {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- BitFieldReader in = new BitFieldReader(InStream.create("test", new ByteBuffer[]{inBuf},
- new long[]{0}, inBuf.remaining(), null, 100));
+ BitFieldReader in = new BitFieldReader(InStream.create("test",
+ new BufferChunk(inBuf, 0), inBuf.remaining()));
for(int i=0; i < COUNT; i += 5) {
int x = (int) in.next();
if (i < COUNT/2) {
@@ -133,8 +133,8 @@ public class TestBitFieldReader {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- BitFieldReader in = new BitFieldReader(InStream.create("test", new ByteBuffer[]{inBuf},
- new long[]{0}, inBuf.remaining(), null, 100));
+ BitFieldReader in = new BitFieldReader(InStream.create("test",
+ new BufferChunk(inBuf, 0), inBuf.remaining()));
in.seek(posn);
in.skip(10);
for(int r = 210; r < COUNT; ++r) {
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestBitPack.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestBitPack.java b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
index f2d3d64..3eba3e6 100644
--- a/java/core/src/test/org/apache/orc/impl/TestBitPack.java
+++ b/java/core/src/test/org/apache/orc/impl/TestBitPack.java
@@ -108,8 +108,9 @@ public class TestBitPack {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
long[] buff = new long[SIZE];
- utils.readInts(buff, 0, SIZE, fixedWidth, InStream.create("test", new ByteBuffer[] { inBuf },
- new long[] { 0 }, inBuf.remaining(), null, SIZE));
+ utils.readInts(buff, 0, SIZE, fixedWidth,
+ InStream.create("test", new BufferChunk(inBuf,0),
+ inBuf.remaining()));
for (int i = 0; i < SIZE; i++) {
buff[i] = utils.zigzagDecode(buff[i]);
}
http://git-wip-us.apache.org/repos/asf/orc/blob/edbb9673/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java b/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java
new file mode 100644
index 0000000..203d3e7
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestCryptoUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.EncryptionAlgorithm;
+import org.apache.orc.OrcProto;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TestCryptoUtils {
+
+ @Test
+ public void testCreateStreamIv() throws Exception {
+ byte[] iv = CryptoUtils.createIvForStream(EncryptionAlgorithm.AES_128,
+ new StreamName(0x234567,
+ OrcProto.Stream.Kind.BLOOM_FILTER_UTF8), 0x123456);
+ assertEquals(16, iv.length);
+ assertEquals(0x23, iv[0]);
+ assertEquals(0x45, iv[1]);
+ assertEquals(0x67, iv[2]);
+ assertEquals(0x0, iv[3]);
+ assertEquals(0x8, iv[4]);
+ assertEquals(0x12, iv[5]);
+ assertEquals(0x34, iv[6]);
+ assertEquals(0x56, iv[7]);
+ }
+}