You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/12 00:28:07 UTC
[10/16] hive git commit: HIVE-11890. Create ORC submodue. (omalley
reviewed by prasanthj)
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/SettableUncompressedStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/SettableUncompressedStream.java b/orc/src/java/org/apache/orc/impl/SettableUncompressedStream.java
new file mode 100644
index 0000000..f9e29eb
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/SettableUncompressedStream.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.common.DiskRangeInfo;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.orc.impl.InStream;
+
+/**
+ * An uncompressed stream whose underlying byte buffer can be set.
+ */
+public class SettableUncompressedStream extends InStream.UncompressedStream {
+
+ public SettableUncompressedStream(String name, List<DiskRange> input, long length) {
+ super(name, input, length);
+ setOffset(input);
+ }
+
+ public void setBuffers(DiskRangeInfo diskRangeInfo) {
+ reset(diskRangeInfo.getDiskRanges(), diskRangeInfo.getTotalLength());
+ setOffset(diskRangeInfo.getDiskRanges());
+ }
+
+ private void setOffset(List<DiskRange> list) {
+ currentOffset = list.isEmpty() ? 0 : list.get(0).getOffset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/SnappyCodec.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/SnappyCodec.java b/orc/src/java/org/apache/orc/impl/SnappyCodec.java
new file mode 100644
index 0000000..dd4f30c
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/SnappyCodec.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.CompressionCodec;
+import org.iq80.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+public class SnappyCodec implements CompressionCodec, DirectDecompressionCodec {
+ private static final HadoopShims SHIMS = HadoopShims.Factory.get();
+
+ Boolean direct = null;
+
+ @Override
+ public boolean compress(ByteBuffer in, ByteBuffer out,
+ ByteBuffer overflow) throws IOException {
+ int inBytes = in.remaining();
+ // I should work on a patch for Snappy to support an overflow buffer
+ // to prevent the extra buffer copy.
+ byte[] compressed = new byte[Snappy.maxCompressedLength(inBytes)];
+ int outBytes =
+ Snappy.compress(in.array(), in.arrayOffset() + in.position(), inBytes,
+ compressed, 0);
+ if (outBytes < inBytes) {
+ int remaining = out.remaining();
+ if (remaining >= outBytes) {
+ System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+ out.position(), outBytes);
+ out.position(out.position() + outBytes);
+ } else {
+ System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
+ out.position(), remaining);
+ out.position(out.limit());
+ System.arraycopy(compressed, remaining, overflow.array(),
+ overflow.arrayOffset(), outBytes - remaining);
+ overflow.position(outBytes - remaining);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+ if(in.isDirect() && out.isDirect()) {
+ directDecompress(in, out);
+ return;
+ }
+ int inOffset = in.position();
+ int uncompressLen =
+ Snappy.uncompress(in.array(), in.arrayOffset() + inOffset,
+ in.limit() - inOffset, out.array(), out.arrayOffset() + out.position());
+ out.position(uncompressLen + out.position());
+ out.flip();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ if (direct == null) {
+ try {
+ if (SHIMS.getDirectDecompressor(
+ HadoopShims.DirectCompressionType.SNAPPY) != null) {
+ direct = Boolean.valueOf(true);
+ } else {
+ direct = Boolean.valueOf(false);
+ }
+ } catch (UnsatisfiedLinkError ule) {
+ direct = Boolean.valueOf(false);
+ }
+ }
+ return direct.booleanValue();
+ }
+
+ @Override
+ public void directDecompress(ByteBuffer in, ByteBuffer out)
+ throws IOException {
+ HadoopShims.DirectDecompressor decompressShim =
+ SHIMS.getDirectDecompressor(HadoopShims.DirectCompressionType.SNAPPY);
+ decompressShim.decompress(in, out);
+ out.flip(); // flip for read
+ }
+
+ @Override
+ public CompressionCodec modify(EnumSet<Modifier> modifiers) {
+ // snappy allows no modifications
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/StreamName.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/StreamName.java b/orc/src/java/org/apache/orc/impl/StreamName.java
new file mode 100644
index 0000000..b3fd145
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/StreamName.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.OrcProto;
+
+/**
+ * The name of a stream within a stripe.
+ */
+public class StreamName implements Comparable<StreamName> {
+ private final int column;
+ private final OrcProto.Stream.Kind kind;
+
+ public static enum Area {
+ DATA, INDEX
+ }
+
+ public StreamName(int column, OrcProto.Stream.Kind kind) {
+ this.column = column;
+ this.kind = kind;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj != null && obj instanceof StreamName) {
+ StreamName other = (StreamName) obj;
+ return other.column == column && other.kind == kind;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int compareTo(StreamName streamName) {
+ if (streamName == null) {
+ return -1;
+ }
+ Area area = getArea(kind);
+ Area otherArea = streamName.getArea(streamName.kind);
+ if (area != otherArea) {
+ return -area.compareTo(otherArea);
+ }
+ if (column != streamName.column) {
+ return column < streamName.column ? -1 : 1;
+ }
+ return kind.compareTo(streamName.kind);
+ }
+
+ public int getColumn() {
+ return column;
+ }
+
+ public OrcProto.Stream.Kind getKind() {
+ return kind;
+ }
+
+ public Area getArea() {
+ return getArea(kind);
+ }
+
+ public static Area getArea(OrcProto.Stream.Kind kind) {
+ switch (kind) {
+ case ROW_INDEX:
+ case DICTIONARY_COUNT:
+ case BLOOM_FILTER:
+ return Area.INDEX;
+ default:
+ return Area.DATA;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Stream for column " + column + " kind " + kind;
+ }
+
+ @Override
+ public int hashCode() {
+ return column * 101 + kind.getNumber();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/StringRedBlackTree.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/StringRedBlackTree.java b/orc/src/java/org/apache/orc/impl/StringRedBlackTree.java
new file mode 100644
index 0000000..c353ab0
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/StringRedBlackTree.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.Text;
+import org.apache.orc.impl.DynamicByteArray;
+import org.apache.orc.impl.DynamicIntArray;
+import org.apache.orc.impl.RedBlackTree;
+
+/**
+ * A red-black tree that stores strings. The strings are stored as UTF-8 bytes
+ * and an offset for each entry.
+ */
+public class StringRedBlackTree extends RedBlackTree {
+ private final DynamicByteArray byteArray = new DynamicByteArray();
+ private final DynamicIntArray keyOffsets;
+ private final Text newKey = new Text();
+
+ public StringRedBlackTree(int initialCapacity) {
+ super(initialCapacity);
+ keyOffsets = new DynamicIntArray(initialCapacity);
+ }
+
+ public int add(String value) {
+ newKey.set(value);
+ return addNewKey();
+ }
+
+ private int addNewKey() {
+ // if the newKey is actually new, add it to our byteArray and store the offset & length
+ if (add()) {
+ int len = newKey.getLength();
+ keyOffsets.add(byteArray.add(newKey.getBytes(), 0, len));
+ }
+ return lastAdd;
+ }
+
+ public int add(Text value) {
+ newKey.set(value);
+ return addNewKey();
+ }
+
+ public int add(byte[] bytes, int offset, int length) {
+ newKey.set(bytes, offset, length);
+ return addNewKey();
+ }
+
+ @Override
+ protected int compareValue(int position) {
+ int start = keyOffsets.get(position);
+ int end;
+ if (position + 1 == keyOffsets.size()) {
+ end = byteArray.size();
+ } else {
+ end = keyOffsets.get(position+1);
+ }
+ return byteArray.compare(newKey.getBytes(), 0, newKey.getLength(),
+ start, end - start);
+ }
+
+ /**
+ * The information about each node.
+ */
+ public interface VisitorContext {
+ /**
+ * Get the position where the key was originally added.
+ * @return the number returned by add.
+ */
+ int getOriginalPosition();
+
+ /**
+ * Write the bytes for the string to the given output stream.
+ * @param out the stream to write to.
+ * @throws IOException
+ */
+ void writeBytes(OutputStream out) throws IOException;
+
+ /**
+ * Get the original string.
+ * @return the string
+ */
+ Text getText();
+
+ /**
+ * Get the number of bytes.
+ * @return the string's length in bytes
+ */
+ int getLength();
+ }
+
+ /**
+ * The interface for visitors.
+ */
+ public interface Visitor {
+ /**
+ * Called once for each node of the tree in sort order.
+ * @param context the information about each node
+ * @throws IOException
+ */
+ void visit(VisitorContext context) throws IOException;
+ }
+
+ private class VisitorContextImpl implements VisitorContext {
+ private int originalPosition;
+ private int start;
+ private int end;
+ private final Text text = new Text();
+
+ public int getOriginalPosition() {
+ return originalPosition;
+ }
+
+ public Text getText() {
+ byteArray.setText(text, start, end - start);
+ return text;
+ }
+
+ public void writeBytes(OutputStream out) throws IOException {
+ byteArray.write(out, start, end - start);
+ }
+
+ public int getLength() {
+ return end - start;
+ }
+
+ void setPosition(int position) {
+ originalPosition = position;
+ start = keyOffsets.get(originalPosition);
+ if (position + 1 == keyOffsets.size()) {
+ end = byteArray.size();
+ } else {
+ end = keyOffsets.get(originalPosition + 1);
+ }
+ }
+ }
+
+ private void recurse(int node, Visitor visitor, VisitorContextImpl context
+ ) throws IOException {
+ if (node != NULL) {
+ recurse(getLeft(node), visitor, context);
+ context.setPosition(node);
+ visitor.visit(context);
+ recurse(getRight(node), visitor, context);
+ }
+ }
+
+ /**
+ * Visit all of the nodes in the tree in sorted order.
+ * @param visitor the action to be applied to each node
+ * @throws IOException
+ */
+ public void visit(Visitor visitor) throws IOException {
+ recurse(root, visitor, new VisitorContextImpl());
+ }
+
+ /**
+ * Reset the table to empty.
+ */
+ public void clear() {
+ super.clear();
+ byteArray.clear();
+ keyOffsets.clear();
+ }
+
+ public void getText(Text result, int originalPosition) {
+ int offset = keyOffsets.get(originalPosition);
+ int length;
+ if (originalPosition + 1 == keyOffsets.size()) {
+ length = byteArray.size() - offset;
+ } else {
+ length = keyOffsets.get(originalPosition + 1) - offset;
+ }
+ byteArray.setText(result, offset, length);
+ }
+
+ /**
+ * Get the size of the character data in the table.
+ * @return the bytes used by the table
+ */
+ public int getCharacterSize() {
+ return byteArray.size();
+ }
+
+ /**
+ * Calculate the approximate size in memory.
+ * @return the number of bytes used in storing the tree.
+ */
+ public long getSizeInBytes() {
+ return byteArray.getSizeInBytes() + keyOffsets.getSizeInBytes() +
+ super.getSizeInBytes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/ZlibCodec.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/ZlibCodec.java b/orc/src/java/org/apache/orc/impl/ZlibCodec.java
new file mode 100644
index 0000000..5f648a8
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/ZlibCodec.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.orc.CompressionCodec;
+
+public class ZlibCodec implements CompressionCodec, DirectDecompressionCodec {
+ private static final HadoopShims SHIMS = HadoopShims.Factory.get();
+ private Boolean direct = null;
+
+ private final int level;
+ private final int strategy;
+
+ public ZlibCodec() {
+ level = Deflater.DEFAULT_COMPRESSION;
+ strategy = Deflater.DEFAULT_STRATEGY;
+ }
+
+ private ZlibCodec(int level, int strategy) {
+ this.level = level;
+ this.strategy = strategy;
+ }
+
+ @Override
+ public boolean compress(ByteBuffer in, ByteBuffer out,
+ ByteBuffer overflow) throws IOException {
+ Deflater deflater = new Deflater(level, true);
+ deflater.setStrategy(strategy);
+ int length = in.remaining();
+ deflater.setInput(in.array(), in.arrayOffset() + in.position(), length);
+ deflater.finish();
+ int outSize = 0;
+ int offset = out.arrayOffset() + out.position();
+ while (!deflater.finished() && (length > outSize)) {
+ int size = deflater.deflate(out.array(), offset, out.remaining());
+ out.position(size + out.position());
+ outSize += size;
+ offset += size;
+ // if we run out of space in the out buffer, use the overflow
+ if (out.remaining() == 0) {
+ if (overflow == null) {
+ deflater.end();
+ return false;
+ }
+ out = overflow;
+ offset = out.arrayOffset() + out.position();
+ }
+ }
+ deflater.end();
+ return length > outSize;
+ }
+
+ @Override
+ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+
+ if(in.isDirect() && out.isDirect()) {
+ directDecompress(in, out);
+ return;
+ }
+
+ Inflater inflater = new Inflater(true);
+ inflater.setInput(in.array(), in.arrayOffset() + in.position(),
+ in.remaining());
+ while (!(inflater.finished() || inflater.needsDictionary() ||
+ inflater.needsInput())) {
+ try {
+ int count = inflater.inflate(out.array(),
+ out.arrayOffset() + out.position(),
+ out.remaining());
+ out.position(count + out.position());
+ } catch (DataFormatException dfe) {
+ throw new IOException("Bad compression data", dfe);
+ }
+ }
+ out.flip();
+ inflater.end();
+ in.position(in.limit());
+ }
+
+ @Override
+ public boolean isAvailable() {
+ if (direct == null) {
+ // see nowrap option in new Inflater(boolean) which disables zlib headers
+ try {
+ if (SHIMS.getDirectDecompressor(
+ HadoopShims.DirectCompressionType.ZLIB_NOHEADER) != null) {
+ direct = Boolean.valueOf(true);
+ } else {
+ direct = Boolean.valueOf(false);
+ }
+ } catch (UnsatisfiedLinkError ule) {
+ direct = Boolean.valueOf(false);
+ }
+ }
+ return direct.booleanValue();
+ }
+
+ @Override
+ public void directDecompress(ByteBuffer in, ByteBuffer out)
+ throws IOException {
+ HadoopShims.DirectDecompressor decompressShim =
+ SHIMS.getDirectDecompressor(HadoopShims.DirectCompressionType.ZLIB_NOHEADER);
+ decompressShim.decompress(in, out);
+ out.flip(); // flip for read
+ }
+
+ @Override
+ public CompressionCodec modify(@Nullable EnumSet<Modifier> modifiers) {
+
+ if (modifiers == null) {
+ return this;
+ }
+
+ int l = this.level;
+ int s = this.strategy;
+
+ for (Modifier m : modifiers) {
+ switch (m) {
+ case BINARY:
+ /* filtered == less LZ77, more huffman */
+ s = Deflater.FILTERED;
+ break;
+ case TEXT:
+ s = Deflater.DEFAULT_STRATEGY;
+ break;
+ case FASTEST:
+ // deflate_fast looking for 8 byte patterns
+ l = Deflater.BEST_SPEED;
+ break;
+ case FAST:
+ // deflate_fast looking for 16 byte patterns
+ l = Deflater.BEST_SPEED + 1;
+ break;
+ case DEFAULT:
+ // deflate_slow looking for 128 byte patterns
+ l = Deflater.DEFAULT_COMPRESSION;
+ break;
+ default:
+ break;
+ }
+ }
+ return new ZlibCodec(l, s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/protobuf/orc_proto.proto
----------------------------------------------------------------------
diff --git a/orc/src/protobuf/orc_proto.proto b/orc/src/protobuf/orc_proto.proto
new file mode 100644
index 0000000..0b36794
--- /dev/null
+++ b/orc/src/protobuf/orc_proto.proto
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package orc.proto;
+
+option java_package = "org.apache.orc";
+
+message IntegerStatistics {
+ optional sint64 minimum = 1;
+ optional sint64 maximum = 2;
+ optional sint64 sum = 3;
+}
+
+message DoubleStatistics {
+ optional double minimum = 1;
+ optional double maximum = 2;
+ optional double sum = 3;
+}
+
+message StringStatistics {
+ optional string minimum = 1;
+ optional string maximum = 2;
+ // sum will store the total length of all strings in a stripe
+ optional sint64 sum = 3;
+}
+
+message BucketStatistics {
+ repeated uint64 count = 1 [packed=true];
+}
+
+message DecimalStatistics {
+ optional string minimum = 1;
+ optional string maximum = 2;
+ optional string sum = 3;
+}
+
+message DateStatistics {
+ // min,max values saved as days since epoch
+ optional sint32 minimum = 1;
+ optional sint32 maximum = 2;
+}
+
+message TimestampStatistics {
+ // min,max values saved as milliseconds since epoch
+ optional sint64 minimum = 1;
+ optional sint64 maximum = 2;
+}
+
+message BinaryStatistics {
+ // sum will store the total binary blob length in a stripe
+ optional sint64 sum = 1;
+}
+
+message ColumnStatistics {
+ optional uint64 numberOfValues = 1;
+ optional IntegerStatistics intStatistics = 2;
+ optional DoubleStatistics doubleStatistics = 3;
+ optional StringStatistics stringStatistics = 4;
+ optional BucketStatistics bucketStatistics = 5;
+ optional DecimalStatistics decimalStatistics = 6;
+ optional DateStatistics dateStatistics = 7;
+ optional BinaryStatistics binaryStatistics = 8;
+ optional TimestampStatistics timestampStatistics = 9;
+ optional bool hasNull = 10;
+}
+
+message RowIndexEntry {
+ repeated uint64 positions = 1 [packed=true];
+ optional ColumnStatistics statistics = 2;
+}
+
+message RowIndex {
+ repeated RowIndexEntry entry = 1;
+}
+
+message BloomFilter {
+ optional uint32 numHashFunctions = 1;
+ repeated fixed64 bitset = 2;
+}
+
+message BloomFilterIndex {
+ repeated BloomFilter bloomFilter = 1;
+}
+
+message Stream {
+ // if you add new index stream kinds, you need to make sure to update
+ // StreamName to ensure it is added to the stripe in the right area
+ enum Kind {
+ PRESENT = 0;
+ DATA = 1;
+ LENGTH = 2;
+ DICTIONARY_DATA = 3;
+ DICTIONARY_COUNT = 4;
+ SECONDARY = 5;
+ ROW_INDEX = 6;
+ BLOOM_FILTER = 7;
+ }
+ optional Kind kind = 1;
+ optional uint32 column = 2;
+ optional uint64 length = 3;
+}
+
+message ColumnEncoding {
+ enum Kind {
+ DIRECT = 0;
+ DICTIONARY = 1;
+ DIRECT_V2 = 2;
+ DICTIONARY_V2 = 3;
+ }
+ optional Kind kind = 1;
+ optional uint32 dictionarySize = 2;
+}
+
+message StripeFooter {
+ repeated Stream streams = 1;
+ repeated ColumnEncoding columns = 2;
+ optional string writerTimezone = 3;
+}
+
+message Type {
+ enum Kind {
+ BOOLEAN = 0;
+ BYTE = 1;
+ SHORT = 2;
+ INT = 3;
+ LONG = 4;
+ FLOAT = 5;
+ DOUBLE = 6;
+ STRING = 7;
+ BINARY = 8;
+ TIMESTAMP = 9;
+ LIST = 10;
+ MAP = 11;
+ STRUCT = 12;
+ UNION = 13;
+ DECIMAL = 14;
+ DATE = 15;
+ VARCHAR = 16;
+ CHAR = 17;
+ }
+ optional Kind kind = 1;
+ repeated uint32 subtypes = 2 [packed=true];
+ repeated string fieldNames = 3;
+ optional uint32 maximumLength = 4;
+ optional uint32 precision = 5;
+ optional uint32 scale = 6;
+}
+
+message StripeInformation {
+ optional uint64 offset = 1;
+ optional uint64 indexLength = 2;
+ optional uint64 dataLength = 3;
+ optional uint64 footerLength = 4;
+ optional uint64 numberOfRows = 5;
+}
+
+message UserMetadataItem {
+ optional string name = 1;
+ optional bytes value = 2;
+}
+
+message StripeStatistics {
+ repeated ColumnStatistics colStats = 1;
+}
+
+message Metadata {
+ repeated StripeStatistics stripeStats = 1;
+}
+
+message Footer {
+ optional uint64 headerLength = 1;
+ optional uint64 contentLength = 2;
+ repeated StripeInformation stripes = 3;
+ repeated Type types = 4;
+ repeated UserMetadataItem metadata = 5;
+ optional uint64 numberOfRows = 6;
+ repeated ColumnStatistics statistics = 7;
+ optional uint32 rowIndexStride = 8;
+}
+
+enum CompressionKind {
+ NONE = 0;
+ ZLIB = 1;
+ SNAPPY = 2;
+ LZO = 3;
+}
+
+// Serialized length must be less that 255 bytes
+message PostScript {
+ optional uint64 footerLength = 1;
+ optional CompressionKind compression = 2;
+ optional uint64 compressionBlockSize = 3;
+ // the version of the file format
+ // [0, 11] = Hive 0.11
+ // [0, 12] = Hive 0.12
+ repeated uint32 version = 4 [packed = true];
+ optional uint64 metadataLength = 5;
+ // Version of the writer:
+ // 0 (or missing) = original
+ // 1 = HIVE-8732 fixed
+ // 2 = HIVE-4243 fixed
+ optional uint32 writerVersion = 6;
+ // Leave this last in the record
+ optional string magic = 8000;
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/test/org/apache/orc/impl/TestBitFieldReader.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestBitFieldReader.java b/orc/src/test/org/apache/orc/impl/TestBitFieldReader.java
new file mode 100644
index 0000000..e4c6f6b
--- /dev/null
+++ b/orc/src/test/org/apache/orc/impl/TestBitFieldReader.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+
+import org.apache.orc.CompressionCodec;
+import org.junit.Test;
+
+public class TestBitFieldReader {
+
+ public void runSeekTest(CompressionCodec codec) throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ final int COUNT = 16384;
+ BitFieldWriter out = new BitFieldWriter(
+ new OutStream("test", 500, codec, collect), 1);
+ TestInStream.PositionCollector[] positions =
+ new TestInStream.PositionCollector[COUNT];
+ for(int i=0; i < COUNT; ++i) {
+ positions[i] = new TestInStream.PositionCollector();
+ out.getPosition(positions[i]);
+ // test runs, non-runs
+ if (i < COUNT / 2) {
+ out.write(i & 1);
+ } else {
+ out.write((i/3) & 1);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ BitFieldReader in = new BitFieldReader(InStream.create("test",
+ new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
+ codec, 500), 1);
+ for(int i=0; i < COUNT; ++i) {
+ int x = in.next();
+ if (i < COUNT / 2) {
+ assertEquals(i & 1, x);
+ } else {
+ assertEquals((i/3) & 1, x);
+ }
+ }
+ for(int i=COUNT-1; i >= 0; --i) {
+ in.seek(positions[i]);
+ int x = in.next();
+ if (i < COUNT / 2) {
+ assertEquals(i & 1, x);
+ } else {
+ assertEquals((i/3) & 1, x);
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedSeek() throws Exception {
+ runSeekTest(null);
+ }
+
+ @Test
+ public void testCompressedSeek() throws Exception {
+ runSeekTest(new ZlibCodec());
+ }
+
+ @Test
+ public void testBiggerItems() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ final int COUNT = 16384;
+ BitFieldWriter out = new BitFieldWriter(
+ new OutStream("test", 500, null, collect), 3);
+ for(int i=0; i < COUNT; ++i) {
+ // test runs, non-runs
+ if (i < COUNT / 2) {
+ out.write(i & 7);
+ } else {
+ out.write((i/3) & 7);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ BitFieldReader in = new BitFieldReader(InStream.create("test",
+ new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
+ null, 500), 3);
+ for(int i=0; i < COUNT; ++i) {
+ int x = in.next();
+ if (i < COUNT / 2) {
+ assertEquals(i & 7, x);
+ } else {
+ assertEquals((i/3) & 7, x);
+ }
+ }
+ }
+
+ @Test
+ public void testSkips() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ BitFieldWriter out = new BitFieldWriter(
+ new OutStream("test", 100, null, collect), 1);
+ final int COUNT = 16384;
+ for(int i=0; i < COUNT; ++i) {
+ if (i < COUNT/2) {
+ out.write(i & 1);
+ } else {
+ out.write((i/3) & 1);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ BitFieldReader in = new BitFieldReader(InStream.create("test", new ByteBuffer[]{inBuf},
+ new long[]{0}, inBuf.remaining(), null, 100), 1);
+ for(int i=0; i < COUNT; i += 5) {
+ int x = (int) in.next();
+ if (i < COUNT/2) {
+ assertEquals(i & 1, x);
+ } else {
+ assertEquals((i/3) & 1, x);
+ }
+ if (i < COUNT - 5) {
+ in.skip(4);
+ }
+ in.skip(0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/test/org/apache/orc/impl/TestBitPack.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestBitPack.java b/orc/src/test/org/apache/orc/impl/TestBitPack.java
new file mode 100644
index 0000000..f2d3d64
--- /dev/null
+++ b/orc/src/test/org/apache/orc/impl/TestBitPack.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import com.google.common.primitives.Longs;
+
+public class TestBitPack {
+
+ private static final int SIZE = 100;
+ private static Random rand = new Random(100);
+ Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test"
+ + File.separator + "tmp"));
+
+ Configuration conf;
+ FileSystem fs;
+ Path testFilePath;
+
+ @Rule
+ public TestName testCaseName = new TestName();
+
+ @Before
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ testFilePath = new Path(workDir, "TestOrcFile." + testCaseName.getMethodName() + ".orc");
+ fs.delete(testFilePath, false);
+ }
+
+ private long[] deltaEncode(long[] inp) {
+ long[] output = new long[inp.length];
+ SerializationUtils utils = new SerializationUtils();
+ for (int i = 0; i < inp.length; i++) {
+ output[i] = utils.zigzagEncode(inp[i]);
+ }
+ return output;
+ }
+
+ private long nextLong(Random rng, long n) {
+ long bits, val;
+ do {
+ bits = (rng.nextLong() << 1) >>> 1;
+ val = bits % n;
+ } while (bits - val + (n - 1) < 0L);
+ return val;
+ }
+
+ private void runTest(int numBits) throws IOException {
+ long[] inp = new long[SIZE];
+ for (int i = 0; i < SIZE; i++) {
+ long val = 0;
+ if (numBits <= 32) {
+ if (numBits == 1) {
+ val = -1 * rand.nextInt(2);
+ } else {
+ val = rand.nextInt((int) Math.pow(2, numBits - 1));
+ }
+ } else {
+ val = nextLong(rand, (long) Math.pow(2, numBits - 2));
+ }
+ if (val % 2 == 0) {
+ val = -val;
+ }
+ inp[i] = val;
+ }
+ long[] deltaEncoded = deltaEncode(inp);
+ long minInput = Collections.min(Longs.asList(deltaEncoded));
+ long maxInput = Collections.max(Longs.asList(deltaEncoded));
+ long rangeInput = maxInput - minInput;
+ SerializationUtils utils = new SerializationUtils();
+ int fixedWidth = utils.findClosestNumBits(rangeInput);
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ OutStream output = new OutStream("test", SIZE, null, collect);
+ utils.writeInts(deltaEncoded, 0, deltaEncoded.length, fixedWidth, output);
+ output.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ long[] buff = new long[SIZE];
+ utils.readInts(buff, 0, SIZE, fixedWidth, InStream.create("test", new ByteBuffer[] { inBuf },
+ new long[] { 0 }, inBuf.remaining(), null, SIZE));
+ for (int i = 0; i < SIZE; i++) {
+ buff[i] = utils.zigzagDecode(buff[i]);
+ }
+ assertEquals(numBits, fixedWidth);
+ assertArrayEquals(inp, buff);
+ }
+
+ @Test
+ public void test01BitPacking1Bit() throws IOException {
+ runTest(1);
+ }
+
+ @Test
+ public void test02BitPacking2Bit() throws IOException {
+ runTest(2);
+ }
+
+ @Test
+ public void test03BitPacking3Bit() throws IOException {
+ runTest(3);
+ }
+
+ @Test
+ public void test04BitPacking4Bit() throws IOException {
+ runTest(4);
+ }
+
+ @Test
+ public void test05BitPacking5Bit() throws IOException {
+ runTest(5);
+ }
+
+ @Test
+ public void test06BitPacking6Bit() throws IOException {
+ runTest(6);
+ }
+
+ @Test
+ public void test07BitPacking7Bit() throws IOException {
+ runTest(7);
+ }
+
+ @Test
+ public void test08BitPacking8Bit() throws IOException {
+ runTest(8);
+ }
+
+ @Test
+ public void test09BitPacking9Bit() throws IOException {
+ runTest(9);
+ }
+
+ @Test
+ public void test10BitPacking10Bit() throws IOException {
+ runTest(10);
+ }
+
+ @Test
+ public void test11BitPacking11Bit() throws IOException {
+ runTest(11);
+ }
+
+ @Test
+ public void test12BitPacking12Bit() throws IOException {
+ runTest(12);
+ }
+
+ @Test
+ public void test13BitPacking13Bit() throws IOException {
+ runTest(13);
+ }
+
+ @Test
+ public void test14BitPacking14Bit() throws IOException {
+ runTest(14);
+ }
+
+ @Test
+ public void test15BitPacking15Bit() throws IOException {
+ runTest(15);
+ }
+
+ @Test
+ public void test16BitPacking16Bit() throws IOException {
+ runTest(16);
+ }
+
+ @Test
+ public void test17BitPacking17Bit() throws IOException {
+ runTest(17);
+ }
+
+ @Test
+ public void test18BitPacking18Bit() throws IOException {
+ runTest(18);
+ }
+
+ @Test
+ public void test19BitPacking19Bit() throws IOException {
+ runTest(19);
+ }
+
+ @Test
+ public void test20BitPacking20Bit() throws IOException {
+ runTest(20);
+ }
+
+ @Test
+ public void test21BitPacking21Bit() throws IOException {
+ runTest(21);
+ }
+
+ @Test
+ public void test22BitPacking22Bit() throws IOException {
+ runTest(22);
+ }
+
+ @Test
+ public void test23BitPacking23Bit() throws IOException {
+ runTest(23);
+ }
+
+ @Test
+ public void test24BitPacking24Bit() throws IOException {
+ runTest(24);
+ }
+
+ @Test
+ public void test26BitPacking26Bit() throws IOException {
+ runTest(26);
+ }
+
+ @Test
+ public void test28BitPacking28Bit() throws IOException {
+ runTest(28);
+ }
+
+ @Test
+ public void test30BitPacking30Bit() throws IOException {
+ runTest(30);
+ }
+
+ @Test
+ public void test32BitPacking32Bit() throws IOException {
+ runTest(32);
+ }
+
+ @Test
+ public void test40BitPacking40Bit() throws IOException {
+ runTest(40);
+ }
+
+ @Test
+ public void test48BitPacking48Bit() throws IOException {
+ runTest(48);
+ }
+
+ @Test
+ public void test56BitPacking56Bit() throws IOException {
+ runTest(56);
+ }
+
+ @Test
+ public void test64BitPacking64Bit() throws IOException {
+ runTest(64);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/test/org/apache/orc/impl/TestDynamicArray.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestDynamicArray.java b/orc/src/test/org/apache/orc/impl/TestDynamicArray.java
new file mode 100644
index 0000000..af583f7
--- /dev/null
+++ b/orc/src/test/org/apache/orc/impl/TestDynamicArray.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.util.Random;
+
+import org.apache.orc.impl.DynamicByteArray;
+import org.apache.orc.impl.DynamicIntArray;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestDynamicArray {
+
+ @Test
+ public void testByteArray() throws Exception {
+ DynamicByteArray dba = new DynamicByteArray(3, 10);
+ dba.add((byte) 0);
+ dba.add((byte) 1);
+ dba.set(3, (byte) 3);
+ dba.set(2, (byte) 2);
+ dba.add((byte) 4);
+ assertEquals("{0,1,2,3,4}", dba.toString());
+ assertEquals(5, dba.size());
+ byte[] val;
+ val = new byte[0];
+ assertEquals(0, dba.compare(val, 0, 0, 2, 0));
+ assertEquals(-1, dba.compare(val, 0, 0, 2, 1));
+ val = new byte[]{3,42};
+ assertEquals(1, dba.compare(val, 0, 1, 2, 0));
+ assertEquals(1, dba.compare(val, 0, 1, 2, 1));
+ assertEquals(0, dba.compare(val, 0, 1, 3, 1));
+ assertEquals(-1, dba.compare(val, 0, 1, 3, 2));
+ assertEquals(1, dba.compare(val, 0, 2, 3, 1));
+ val = new byte[256];
+ for(int b=-128; b < 128; ++b) {
+ dba.add((byte) b);
+ val[b+128] = (byte) b;
+ }
+ assertEquals(0, dba.compare(val, 0, 256, 5, 256));
+ assertEquals(1, dba.compare(val, 0, 1, 0, 1));
+ assertEquals(1, dba.compare(val, 254, 1, 0, 1));
+ assertEquals(1, dba.compare(val, 120, 1, 64, 1));
+ val = new byte[1024];
+ Random rand = new Random(1701);
+ for(int i = 0; i < val.length; ++i) {
+ rand.nextBytes(val);
+ }
+ dba.add(val, 0, 1024);
+ assertEquals(1285, dba.size());
+ assertEquals(0, dba.compare(val, 0, 1024, 261, 1024));
+ }
+
+ @Test
+ public void testIntArray() throws Exception {
+ DynamicIntArray dia = new DynamicIntArray(10);
+ for(int i=0; i < 10000; ++i) {
+ dia.add(2*i);
+ }
+ assertEquals(10000, dia.size());
+ for(int i=0; i < 10000; ++i) {
+ assertEquals(2*i, dia.get(i));
+ }
+ dia.clear();
+ assertEquals(0, dia.size());
+ dia.add(3);
+ dia.add(12);
+ dia.add(65);
+ assertEquals("{3,12,65}", dia.toString());
+ for(int i=0; i < 5; ++i) {
+ dia.increment(i, 3);
+ }
+ assertEquals("{6,15,68,3,3}", dia.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/test/org/apache/orc/impl/TestInStream.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestInStream.java b/orc/src/test/org/apache/orc/impl/TestInStream.java
new file mode 100644
index 0000000..9e65345
--- /dev/null
+++ b/orc/src/test/org/apache/orc/impl/TestInStream.java
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.orc.CompressionCodec;
+import org.junit.Test;
+
+public class TestInStream {
+
+ static class OutputCollector implements OutStream.OutputReceiver {
+ DynamicByteArray buffer = new DynamicByteArray();
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ this.buffer.add(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ static class PositionCollector
+ implements PositionProvider, PositionRecorder {
+ private List<Long> positions = new ArrayList<Long>();
+ private int index = 0;
+
+ @Override
+ public long getNext() {
+ return positions.get(index++);
+ }
+
+ @Override
+ public void addPosition(long offset) {
+ positions.add(offset);
+ }
+
+ public void reset() {
+ index = 0;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("position: ");
+ for(int i=0; i < positions.size(); ++i) {
+ if (i != 0) {
+ builder.append(", ");
+ }
+ builder.append(positions.get(i));
+ }
+ return builder.toString();
+ }
+ }
+
+ @Test
+ public void testUncompressed() throws Exception {
+ OutputCollector collect = new OutputCollector();
+ OutStream out = new OutStream("test", 100, null, collect);
+ PositionCollector[] positions = new PositionCollector[1024];
+ for(int i=0; i < 1024; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ out.write(i);
+ }
+ out.flush();
+ assertEquals(1024, collect.buffer.size());
+ for(int i=0; i < 1024; ++i) {
+ assertEquals((byte) i, collect.buffer.get(i));
+ }
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
+ new long[]{0}, inBuf.remaining(), null, 100);
+ assertEquals("uncompressed stream test position: 0 length: 1024" +
+ " range: 0 offset: 0 limit: 0",
+ in.toString());
+ for(int i=0; i < 1024; ++i) {
+ int x = in.read();
+ assertEquals(i & 0xff, x);
+ }
+ for(int i=1023; i >= 0; --i) {
+ in.seek(positions[i]);
+ assertEquals(i & 0xff, in.read());
+ }
+ }
+
+ @Test
+ public void testCompressed() throws Exception {
+ OutputCollector collect = new OutputCollector();
+ CompressionCodec codec = new ZlibCodec();
+ OutStream out = new OutStream("test", 300, codec, collect);
+ PositionCollector[] positions = new PositionCollector[1024];
+ for(int i=0; i < 1024; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ out.write(i);
+ }
+ out.flush();
+ assertEquals("test", out.toString());
+ assertEquals(961, collect.buffer.size());
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
+ new long[]{0}, inBuf.remaining(), codec, 300);
+ assertEquals("compressed stream test position: 0 length: 961 range: 0" +
+ " offset: 0 limit: 0 range 0 = 0 to 961",
+ in.toString());
+ for(int i=0; i < 1024; ++i) {
+ int x = in.read();
+ assertEquals(i & 0xff, x);
+ }
+ assertEquals(0, in.available());
+ for(int i=1023; i >= 0; --i) {
+ in.seek(positions[i]);
+ assertEquals(i & 0xff, in.read());
+ }
+ }
+
+ @Test
+ public void testCorruptStream() throws Exception {
+ OutputCollector collect = new OutputCollector();
+ CompressionCodec codec = new ZlibCodec();
+ OutStream out = new OutStream("test", 500, codec, collect);
+ PositionCollector[] positions = new PositionCollector[1024];
+ for(int i=0; i < 1024; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ out.write(i);
+ }
+ out.flush();
+
+ // now try to read the stream with a buffer that is too small
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
+ new long[]{0}, inBuf.remaining(), codec, 100);
+ byte[] contents = new byte[1024];
+ try {
+ in.read(contents);
+ fail();
+ } catch(IllegalArgumentException iae) {
+ // EXPECTED
+ }
+
+ // make a corrupted header
+ inBuf.clear();
+ inBuf.put((byte) 32);
+ inBuf.put((byte) 0);
+ inBuf.flip();
+ in = InStream.create("test2", new ByteBuffer[]{inBuf}, new long[]{0},
+ inBuf.remaining(), codec, 300);
+ try {
+ in.read();
+ fail();
+ } catch (IllegalStateException ise) {
+ // EXPECTED
+ }
+ }
+
+ @Test
+ public void testDisjointBuffers() throws Exception {
+ OutputCollector collect = new OutputCollector();
+ CompressionCodec codec = new ZlibCodec();
+ OutStream out = new OutStream("test", 400, codec, collect);
+ PositionCollector[] positions = new PositionCollector[1024];
+ DataOutput stream = new DataOutputStream(out);
+ for(int i=0; i < 1024; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ stream.writeInt(i);
+ }
+ out.flush();
+ assertEquals("test", out.toString());
+ assertEquals(1674, collect.buffer.size());
+ ByteBuffer[] inBuf = new ByteBuffer[3];
+ inBuf[0] = ByteBuffer.allocate(500);
+ inBuf[1] = ByteBuffer.allocate(1200);
+ inBuf[2] = ByteBuffer.allocate(500);
+ collect.buffer.setByteBuffer(inBuf[0], 0, 483);
+ collect.buffer.setByteBuffer(inBuf[1], 483, 1625 - 483);
+ collect.buffer.setByteBuffer(inBuf[2], 1625, 1674 - 1625);
+
+ for(int i=0; i < inBuf.length; ++i) {
+ inBuf[i].flip();
+ }
+ InStream in = InStream.create("test", inBuf,
+ new long[]{0,483, 1625}, 1674, codec, 400);
+ assertEquals("compressed stream test position: 0 length: 1674 range: 0" +
+ " offset: 0 limit: 0 range 0 = 0 to 483;" +
+ " range 1 = 483 to 1142; range 2 = 1625 to 49",
+ in.toString());
+ DataInputStream inStream = new DataInputStream(in);
+ for(int i=0; i < 1024; ++i) {
+ int x = inStream.readInt();
+ assertEquals(i, x);
+ }
+ assertEquals(0, in.available());
+ for(int i=1023; i >= 0; --i) {
+ in.seek(positions[i]);
+ assertEquals(i, inStream.readInt());
+ }
+
+ in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]},
+ new long[]{483, 1625}, 1674, codec, 400);
+ inStream = new DataInputStream(in);
+ positions[303].reset();
+ in.seek(positions[303]);
+ for(int i=303; i < 1024; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+
+ in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]},
+ new long[]{0, 1625}, 1674, codec, 400);
+ inStream = new DataInputStream(in);
+ positions[1001].reset();
+ for(int i=0; i < 300; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+ in.seek(positions[1001]);
+ for(int i=1001; i < 1024; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+ }
+
+ @Test
+ public void testUncompressedDisjointBuffers() throws Exception {
+ OutputCollector collect = new OutputCollector();
+ OutStream out = new OutStream("test", 400, null, collect);
+ PositionCollector[] positions = new PositionCollector[1024];
+ DataOutput stream = new DataOutputStream(out);
+ for(int i=0; i < 1024; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ stream.writeInt(i);
+ }
+ out.flush();
+ assertEquals("test", out.toString());
+ assertEquals(4096, collect.buffer.size());
+ ByteBuffer[] inBuf = new ByteBuffer[3];
+ inBuf[0] = ByteBuffer.allocate(1100);
+ inBuf[1] = ByteBuffer.allocate(2200);
+ inBuf[2] = ByteBuffer.allocate(1100);
+ collect.buffer.setByteBuffer(inBuf[0], 0, 1024);
+ collect.buffer.setByteBuffer(inBuf[1], 1024, 2048);
+ collect.buffer.setByteBuffer(inBuf[2], 3072, 1024);
+
+ for(int i=0; i < inBuf.length; ++i) {
+ inBuf[i].flip();
+ }
+ InStream in = InStream.create("test", inBuf,
+ new long[]{0, 1024, 3072}, 4096, null, 400);
+ assertEquals("uncompressed stream test position: 0 length: 4096" +
+ " range: 0 offset: 0 limit: 0",
+ in.toString());
+ DataInputStream inStream = new DataInputStream(in);
+ for(int i=0; i < 1024; ++i) {
+ int x = inStream.readInt();
+ assertEquals(i, x);
+ }
+ assertEquals(0, in.available());
+ for(int i=1023; i >= 0; --i) {
+ in.seek(positions[i]);
+ assertEquals(i, inStream.readInt());
+ }
+
+ in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]},
+ new long[]{1024, 3072}, 4096, null, 400);
+ inStream = new DataInputStream(in);
+ positions[256].reset();
+ in.seek(positions[256]);
+ for(int i=256; i < 1024; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+
+ in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]},
+ new long[]{0, 3072}, 4096, null, 400);
+ inStream = new DataInputStream(in);
+ positions[768].reset();
+ for(int i=0; i < 256; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+ in.seek(positions[768]);
+ for(int i=768; i < 1024; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java b/orc/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
new file mode 100644
index 0000000..399f35e
--- /dev/null
+++ b/orc/src/test/org/apache/orc/impl/TestIntegerCompressionReader.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.orc.CompressionCodec;
+import org.junit.Test;
+
+public class TestIntegerCompressionReader {
+
+ public void runSeekTest(CompressionCodec codec) throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
+ new OutStream("test", 1000, codec, collect), true);
+ TestInStream.PositionCollector[] positions =
+ new TestInStream.PositionCollector[4096];
+ Random random = new Random(99);
+ int[] junk = new int[2048];
+ for(int i=0; i < junk.length; ++i) {
+ junk[i] = random.nextInt();
+ }
+ for(int i=0; i < 4096; ++i) {
+ positions[i] = new TestInStream.PositionCollector();
+ out.getPosition(positions[i]);
+ // test runs, incrementing runs, non-runs
+ if (i < 1024) {
+ out.write(i/4);
+ } else if (i < 2048) {
+ out.write(2*i);
+ } else {
+ out.write(junk[i-2048]);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthIntegerReaderV2 in =
+ new RunLengthIntegerReaderV2(InStream.create
+ ("test", new ByteBuffer[]{inBuf},
+ new long[]{0}, inBuf.remaining(),
+ codec, 1000), true, false);
+ for(int i=0; i < 2048; ++i) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ for(int i=2047; i >= 0; --i) {
+ in.seek(positions[i]);
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedSeek() throws Exception {
+ runSeekTest(null);
+ }
+
+ @Test
+ public void testCompressedSeek() throws Exception {
+ runSeekTest(new ZlibCodec());
+ }
+
+ @Test
+ public void testSkips() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthIntegerWriterV2 out = new RunLengthIntegerWriterV2(
+ new OutStream("test", 100, null, collect), true);
+ for(int i=0; i < 2048; ++i) {
+ if (i < 1024) {
+ out.write(i);
+ } else {
+ out.write(256 * i);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthIntegerReaderV2 in =
+ new RunLengthIntegerReaderV2(InStream.create("test",
+ new ByteBuffer[]{inBuf},
+ new long[]{0},
+ inBuf.remaining(),
+ null, 100), true, false);
+ for(int i=0; i < 2048; i += 10) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i, x);
+ } else {
+ assertEquals(256 * i, x);
+ }
+ if (i < 2038) {
+ in.skip(9);
+ }
+ in.skip(0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/test/org/apache/orc/impl/TestMemoryManager.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestMemoryManager.java b/orc/src/test/org/apache/orc/impl/TestMemoryManager.java
new file mode 100644
index 0000000..f48c545
--- /dev/null
+++ b/orc/src/test/org/apache/orc/impl/TestMemoryManager.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.impl.MemoryManager;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.lang.management.ManagementFactory;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test the ORC memory manager.
+ */
+public class TestMemoryManager {
+ private static final double ERROR = 0.000001;
+
+ private static class NullCallback implements MemoryManager.Callback {
+ public boolean checkMemory(double newScale) {
+ return false;
+ }
+ }
+
+ @Test
+ public void testBasics() throws Exception {
+ Configuration conf = new Configuration();
+ MemoryManager mgr = new MemoryManager(conf);
+ NullCallback callback = new NullCallback();
+ long poolSize = mgr.getTotalMemoryPool();
+ assertEquals(Math.round(ManagementFactory.getMemoryMXBean().
+ getHeapMemoryUsage().getMax() * 0.5d), poolSize);
+ assertEquals(1.0, mgr.getAllocationScale(), 0.00001);
+ mgr.addWriter(new Path("p1"), 1000, callback);
+ assertEquals(1.0, mgr.getAllocationScale(), 0.00001);
+ mgr.addWriter(new Path("p1"), poolSize / 2, callback);
+ assertEquals(1.0, mgr.getAllocationScale(), 0.00001);
+ mgr.addWriter(new Path("p2"), poolSize / 2, callback);
+ assertEquals(1.0, mgr.getAllocationScale(), 0.00001);
+ mgr.addWriter(new Path("p3"), poolSize / 2, callback);
+ assertEquals(0.6666667, mgr.getAllocationScale(), 0.00001);
+ mgr.addWriter(new Path("p4"), poolSize / 2, callback);
+ assertEquals(0.5, mgr.getAllocationScale(), 0.000001);
+ mgr.addWriter(new Path("p4"), 3 * poolSize / 2, callback);
+ assertEquals(0.3333333, mgr.getAllocationScale(), 0.000001);
+ mgr.removeWriter(new Path("p1"));
+ mgr.removeWriter(new Path("p2"));
+ assertEquals(0.5, mgr.getAllocationScale(), 0.00001);
+ mgr.removeWriter(new Path("p4"));
+ assertEquals(1.0, mgr.getAllocationScale(), 0.00001);
+ }
+
+ @Test
+ public void testConfig() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("hive.exec.orc.memory.pool", "0.9");
+ MemoryManager mgr = new MemoryManager(conf);
+ long mem =
+ ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
+ System.err.print("Memory = " + mem);
+ long pool = mgr.getTotalMemoryPool();
+ assertTrue("Pool too small: " + pool, mem * 0.899 < pool);
+ assertTrue("Pool too big: " + pool, pool < mem * 0.901);
+ }
+
+ private static class DoubleMatcher extends BaseMatcher<Double> {
+ final double expected;
+ final double error;
+ DoubleMatcher(double expected, double error) {
+ this.expected = expected;
+ this.error = error;
+ }
+
+ @Override
+ public boolean matches(Object val) {
+ double dbl = (Double) val;
+ return Math.abs(dbl - expected) <= error;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("not sufficiently close to ");
+ description.appendText(Double.toString(expected));
+ }
+ }
+
+ private static DoubleMatcher closeTo(double value, double error) {
+ return new DoubleMatcher(value, error);
+ }
+
+ @Test
+ public void testCallback() throws Exception {
+ Configuration conf = new Configuration();
+ MemoryManager mgr = new MemoryManager(conf);
+ long pool = mgr.getTotalMemoryPool();
+ MemoryManager.Callback[] calls = new MemoryManager.Callback[20];
+ for(int i=0; i < calls.length; ++i) {
+ calls[i] = Mockito.mock(MemoryManager.Callback.class);
+ mgr.addWriter(new Path(Integer.toString(i)), pool/4, calls[i]);
+ }
+ // add enough rows to get the memory manager to check the limits
+ for(int i=0; i < 10000; ++i) {
+ mgr.addedRow(1);
+ }
+ for(int call=0; call < calls.length; ++call) {
+ Mockito.verify(calls[call], Mockito.times(2))
+ .checkMemory(Matchers.doubleThat(closeTo(0.2, ERROR)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestRunLengthByteReader.java b/orc/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
new file mode 100644
index 0000000..a14bef1
--- /dev/null
+++ b/orc/src/test/org/apache/orc/impl/TestRunLengthByteReader.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+
+import org.apache.orc.CompressionCodec;
+import org.junit.Test;
+
+public class TestRunLengthByteReader {
+
+ @Test
+ public void testUncompressedSeek() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test", 100,
+ null, collect));
+ TestInStream.PositionCollector[] positions =
+ new TestInStream.PositionCollector[2048];
+ for(int i=0; i < 2048; ++i) {
+ positions[i] = new TestInStream.PositionCollector();
+ out.getPosition(positions[i]);
+ if (i < 1024) {
+ out.write((byte) (i/4));
+ } else {
+ out.write((byte) i);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
+ new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), null, 100));
+ for(int i=0; i < 2048; ++i) {
+ int x = in.next() & 0xff;
+ if (i < 1024) {
+ assertEquals((i/4) & 0xff, x);
+ } else {
+ assertEquals(i & 0xff, x);
+ }
+ }
+ for(int i=2047; i >= 0; --i) {
+ in.seek(positions[i]);
+ int x = in.next() & 0xff;
+ if (i < 1024) {
+ assertEquals((i/4) & 0xff, x);
+ } else {
+ assertEquals(i & 0xff, x);
+ }
+ }
+ }
+
+ @Test
+ public void testCompressedSeek() throws Exception {
+ CompressionCodec codec = new SnappyCodec();
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test", 500,
+ codec, collect));
+ TestInStream.PositionCollector[] positions =
+ new TestInStream.PositionCollector[2048];
+ for(int i=0; i < 2048; ++i) {
+ positions[i] = new TestInStream.PositionCollector();
+ out.getPosition(positions[i]);
+ if (i < 1024) {
+ out.write((byte) (i/4));
+ } else {
+ out.write((byte) i);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
+ new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), codec, 500));
+ for(int i=0; i < 2048; ++i) {
+ int x = in.next() & 0xff;
+ if (i < 1024) {
+ assertEquals((i/4) & 0xff, x);
+ } else {
+ assertEquals(i & 0xff, x);
+ }
+ }
+ for(int i=2047; i >= 0; --i) {
+ in.seek(positions[i]);
+ int x = in.next() & 0xff;
+ if (i < 1024) {
+ assertEquals((i/4) & 0xff, x);
+ } else {
+ assertEquals(i & 0xff, x);
+ }
+ }
+ }
+
+ @Test
+ public void testSkips() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthByteWriter out = new RunLengthByteWriter(new OutStream("test", 100,
+ null, collect));
+ for(int i=0; i < 2048; ++i) {
+ if (i < 1024) {
+ out.write((byte) (i/16));
+ } else {
+ out.write((byte) i);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthByteReader in = new RunLengthByteReader(InStream.create("test",
+ new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), null, 100));
+ for(int i=0; i < 2048; i += 10) {
+ int x = in.next() & 0xff;
+ if (i < 1024) {
+ assertEquals((i/16) & 0xff, x);
+ } else {
+ assertEquals(i & 0xff, x);
+ }
+ if (i < 2038) {
+ in.skip(9);
+ }
+ in.skip(0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java b/orc/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
new file mode 100644
index 0000000..28239ba
--- /dev/null
+++ b/orc/src/test/org/apache/orc/impl/TestRunLengthIntegerReader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import static junit.framework.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.orc.CompressionCodec;
+import org.junit.Test;
+
+public class TestRunLengthIntegerReader {
+
+ public void runSeekTest(CompressionCodec codec) throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthIntegerWriter out = new RunLengthIntegerWriter(
+ new OutStream("test", 1000, codec, collect), true);
+ TestInStream.PositionCollector[] positions =
+ new TestInStream.PositionCollector[4096];
+ Random random = new Random(99);
+ int[] junk = new int[2048];
+ for(int i=0; i < junk.length; ++i) {
+ junk[i] = random.nextInt();
+ }
+ for(int i=0; i < 4096; ++i) {
+ positions[i] = new TestInStream.PositionCollector();
+ out.getPosition(positions[i]);
+ // test runs, incrementing runs, non-runs
+ if (i < 1024) {
+ out.write(i/4);
+ } else if (i < 2048) {
+ out.write(2*i);
+ } else {
+ out.write(junk[i-2048]);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthIntegerReader in = new RunLengthIntegerReader(InStream.create
+ ("test", new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
+ codec, 1000), true);
+ for(int i=0; i < 2048; ++i) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ for(int i=2047; i >= 0; --i) {
+ in.seek(positions[i]);
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i/4, x);
+ } else if (i < 2048) {
+ assertEquals(2*i, x);
+ } else {
+ assertEquals(junk[i-2048], x);
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedSeek() throws Exception {
+ runSeekTest(null);
+ }
+
+ @Test
+ public void testCompressedSeek() throws Exception {
+ runSeekTest(new ZlibCodec());
+ }
+
+ @Test
+ public void testSkips() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ RunLengthIntegerWriter out = new RunLengthIntegerWriter(
+ new OutStream("test", 100, null, collect), true);
+ for(int i=0; i < 2048; ++i) {
+ if (i < 1024) {
+ out.write(i);
+ } else {
+ out.write(256 * i);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ RunLengthIntegerReader in = new RunLengthIntegerReader(InStream.create
+ ("test", new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
+ null, 100), true);
+ for(int i=0; i < 2048; i += 10) {
+ int x = (int) in.next();
+ if (i < 1024) {
+ assertEquals(i, x);
+ } else {
+ assertEquals(256 * i, x);
+ }
+ if (i < 2038) {
+ in.skip(9);
+ }
+ in.skip(0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java b/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java
new file mode 100644
index 0000000..0785412
--- /dev/null
+++ b/orc/src/test/org/apache/orc/impl/TestSerializationUtils.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.math.BigInteger;
+
+import org.junit.Test;
+
+import com.google.common.math.LongMath;
+
+public class TestSerializationUtils {
+
+ private InputStream fromBuffer(ByteArrayOutputStream buffer) {
+ return new ByteArrayInputStream(buffer.toByteArray());
+ }
+
+ @Test
+ public void testDoubles() throws Exception {
+ double tolerance = 0.0000000000000001;
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ SerializationUtils utils = new SerializationUtils();
+ utils.writeDouble(buffer, 1343822337.759);
+ assertEquals(1343822337.759, utils.readDouble(fromBuffer(buffer)), tolerance);
+ buffer = new ByteArrayOutputStream();
+ utils.writeDouble(buffer, 0.8);
+ double got = utils.readDouble(fromBuffer(buffer));
+ assertEquals(0.8, got, tolerance);
+ }
+
+ @Test
+ public void testBigIntegers() throws Exception {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ SerializationUtils.writeBigInteger(buffer, BigInteger.valueOf(0));
+ assertArrayEquals(new byte[]{0}, buffer.toByteArray());
+ assertEquals(0L,
+ SerializationUtils.readBigInteger(fromBuffer(buffer)).longValue());
+ buffer.reset();
+ SerializationUtils.writeBigInteger(buffer, BigInteger.valueOf(1));
+ assertArrayEquals(new byte[]{2}, buffer.toByteArray());
+ assertEquals(1L,
+ SerializationUtils.readBigInteger(fromBuffer(buffer)).longValue());
+ buffer.reset();
+ SerializationUtils.writeBigInteger(buffer, BigInteger.valueOf(-1));
+ assertArrayEquals(new byte[]{1}, buffer.toByteArray());
+ assertEquals(-1L,
+ SerializationUtils.readBigInteger(fromBuffer(buffer)).longValue());
+ buffer.reset();
+ SerializationUtils.writeBigInteger(buffer, BigInteger.valueOf(50));
+ assertArrayEquals(new byte[]{100}, buffer.toByteArray());
+ assertEquals(50L,
+ SerializationUtils.readBigInteger(fromBuffer(buffer)).longValue());
+ buffer.reset();
+ SerializationUtils.writeBigInteger(buffer, BigInteger.valueOf(-50));
+ assertArrayEquals(new byte[]{99}, buffer.toByteArray());
+ assertEquals(-50L,
+ SerializationUtils.readBigInteger(fromBuffer(buffer)).longValue());
+ for(int i=-8192; i < 8192; ++i) {
+ buffer.reset();
+ SerializationUtils.writeBigInteger(buffer, BigInteger.valueOf(i));
+ assertEquals("compare length for " + i,
+ i >= -64 && i < 64 ? 1 : 2, buffer.size());
+ assertEquals("compare result for " + i,
+ i, SerializationUtils.readBigInteger(fromBuffer(buffer)).intValue());
+ }
+ buffer.reset();
+ SerializationUtils.writeBigInteger(buffer,
+ new BigInteger("123456789abcdef0",16));
+ assertEquals(new BigInteger("123456789abcdef0",16),
+ SerializationUtils.readBigInteger(fromBuffer(buffer)));
+ buffer.reset();
+ SerializationUtils.writeBigInteger(buffer,
+ new BigInteger("-123456789abcdef0",16));
+ assertEquals(new BigInteger("-123456789abcdef0",16),
+ SerializationUtils.readBigInteger(fromBuffer(buffer)));
+ StringBuilder buf = new StringBuilder();
+ for(int i=0; i < 256; ++i) {
+ String num = Integer.toHexString(i);
+ if (num.length() == 1) {
+ buf.append('0');
+ }
+ buf.append(num);
+ }
+ buffer.reset();
+ SerializationUtils.writeBigInteger(buffer,
+ new BigInteger(buf.toString(),16));
+ assertEquals(new BigInteger(buf.toString(),16),
+ SerializationUtils.readBigInteger(fromBuffer(buffer)));
+ buffer.reset();
+ SerializationUtils.writeBigInteger(buffer,
+ new BigInteger("ff000000000000000000000000000000000000000000ff",16));
+ assertEquals(
+ new BigInteger("ff000000000000000000000000000000000000000000ff",16),
+ SerializationUtils.readBigInteger(fromBuffer(buffer)));
+ }
+
+ @Test
+ public void testSubtractionOverflow() {
+ // cross check results with Guava results below
+ SerializationUtils utils = new SerializationUtils();
+ assertEquals(false, utils.isSafeSubtract(22222222222L, Long.MIN_VALUE));
+ assertEquals(false, utils.isSafeSubtract(-22222222222L, Long.MAX_VALUE));
+ assertEquals(false, utils.isSafeSubtract(Long.MIN_VALUE, Long.MAX_VALUE));
+ assertEquals(true, utils.isSafeSubtract(-1553103058346370095L, 6553103058346370095L));
+ assertEquals(true, utils.isSafeSubtract(0, Long.MAX_VALUE));
+ assertEquals(true, utils.isSafeSubtract(Long.MIN_VALUE, 0));
+ }
+
+ @Test
+ public void testSubtractionOverflowGuava() {
+ try {
+ LongMath.checkedSubtract(22222222222L, Long.MIN_VALUE);
+ fail("expected ArithmeticException for overflow");
+ } catch (ArithmeticException ex) {
+ assertEquals(ex.getMessage(), "overflow");
+ }
+
+ try {
+ LongMath.checkedSubtract(-22222222222L, Long.MAX_VALUE);
+ fail("expected ArithmeticException for overflow");
+ } catch (ArithmeticException ex) {
+ assertEquals(ex.getMessage(), "overflow");
+ }
+
+ try {
+ LongMath.checkedSubtract(Long.MIN_VALUE, Long.MAX_VALUE);
+ fail("expected ArithmeticException for overflow");
+ } catch (ArithmeticException ex) {
+ assertEquals(ex.getMessage(), "overflow");
+ }
+
+ assertEquals(-8106206116692740190L,
+ LongMath.checkedSubtract(-1553103058346370095L, 6553103058346370095L));
+ assertEquals(-Long.MAX_VALUE, LongMath.checkedSubtract(0, Long.MAX_VALUE));
+ assertEquals(Long.MIN_VALUE, LongMath.checkedSubtract(Long.MIN_VALUE, 0));
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestSerializationUtils test = new TestSerializationUtils();
+ test.testDoubles();
+ test.testBigIntegers();
+ }
+}