You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2013/08/12 17:03:31 UTC
svn commit: r1513155 [1/3] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/
ql/src/java/org/apache/hadoop/hive/ql/io/orc/
ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/ ql/...
Author: omalley
Date: Mon Aug 12 15:03:30 2013
New Revision: 1513155
URL: http://svn.apache.org/r1513155
Log:
HIVE-4123 Improved ORC integer RLE version 2. (Prasanth Jayachandran via
omalley)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java
hive/trunk/ql/src/test/resources/orc-file-dump-dictionary-threshold.out
hive/trunk/ql/src/test/resources/orc-file-dump.out
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Aug 12 15:03:30 2013
@@ -502,6 +502,9 @@ public class HiveConf extends Configurat
// Maximum fraction of heap that can be used by ORC file writers
HIVE_ORC_FILE_MEMORY_POOL("hive.exec.orc.memory.pool", 0.5f), // 50%
+ // use 0.11 version of RLE encoding. if this conf is not defined or any
+ // other value specified, ORC will use the new RLE encoding
+ HIVE_ORC_WRITE_FORMAT("hive.exec.orc.write.format", "0.11"),
HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f),
Modified: hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java (original)
+++ hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java Mon Aug 12 15:03:30 2013
@@ -5695,10 +5695,14 @@ public final class OrcProto {
implements com.google.protobuf.ProtocolMessageEnum {
DIRECT(0, 0),
DICTIONARY(1, 1),
+ DIRECT_V2(2, 2),
+ DICTIONARY_V2(3, 3),
;
public static final int DIRECT_VALUE = 0;
public static final int DICTIONARY_VALUE = 1;
+ public static final int DIRECT_V2_VALUE = 2;
+ public static final int DICTIONARY_V2_VALUE = 3;
public final int getNumber() { return value; }
@@ -5707,6 +5711,8 @@ public final class OrcProto {
switch (value) {
case 0: return DIRECT;
case 1: return DICTIONARY;
+ case 2: return DIRECT_V2;
+ case 3: return DICTIONARY_V2;
default: return null;
}
}
@@ -5737,7 +5743,7 @@ public final class OrcProto {
}
private static final Kind[] VALUES = {
- DIRECT, DICTIONARY,
+ DIRECT, DICTIONARY, DIRECT_V2, DICTIONARY_V2,
};
public static Kind valueOf(
@@ -11117,42 +11123,42 @@ public final class OrcProto {
"eam.Kind\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004",
"\"r\n\004Kind\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGT" +
"H\020\002\022\023\n\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY_C" +
- "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\221\001\n" +
+ "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\263\001\n" +
"\016ColumnEncoding\022C\n\004kind\030\001 \002(\01625.org.apac" +
"he.hadoop.hive.ql.io.orc.ColumnEncoding." +
- "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"\"\n\004Kind\022\n\n\006" +
- "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\"\214\001\n\014StripeFoote" +
- "r\0229\n\007streams\030\001 \003(\0132(.org.apache.hadoop.h" +
- "ive.ql.io.orc.Stream\022A\n\007columns\030\002 \003(\01320." +
- "org.apache.hadoop.hive.ql.io.orc.ColumnE",
- "ncoding\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apa" +
- "che.hadoop.hive.ql.io.orc.Type.Kind\022\024\n\010s" +
- "ubtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001" +
- "\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002" +
- "\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE" +
- "\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020" +
- "\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNIO" +
- "N\020\r\022\013\n\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInfo" +
- "rmation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002" +
- " \001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength",
- "\030\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMeta" +
- "dataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002" +
- "\n\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rconten" +
- "tLength\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apa" +
- "che.hadoop.hive.ql.io.orc.StripeInformat" +
- "ion\0225\n\005types\030\004 \003(\0132&.org.apache.hadoop.h" +
- "ive.ql.io.orc.Type\022D\n\010metadata\030\005 \003(\01322.o" +
- "rg.apache.hadoop.hive.ql.io.orc.UserMeta" +
- "dataItem\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatis" +
- "tics\030\007 \003(\01322.org.apache.hadoop.hive.ql.i",
- "o.orc.ColumnStatistics\022\026\n\016rowIndexStride" +
- "\030\010 \001(\r\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 " +
- "\001(\004\022F\n\013compression\030\002 \001(\01621.org.apache.ha" +
- "doop.hive.ql.io.orc.CompressionKind\022\034\n\024c" +
- "ompressionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003" +
- "(\rB\002\020\001\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKin" +
- "d\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO" +
- "\020\003"
+ "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"D\n\004Kind\022\n\n\006" +
+ "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020\002\022" +
+ "\021\n\rDICTIONARY_V2\020\003\"\214\001\n\014StripeFooter\0229\n\007s" +
+ "treams\030\001 \003(\0132(.org.apache.hadoop.hive.ql" +
+ ".io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org.ap",
+ "ache.hadoop.hive.ql.io.orc.ColumnEncodin" +
+ "g\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache.ha" +
+ "doop.hive.ql.io.orc.Type.Kind\022\024\n\010subtype" +
+ "s\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001\n\004Kind" +
+ "\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003IN" +
+ "T\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006" +
+ "STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004L" +
+ "IST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n" +
+ "\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInformatio" +
+ "n\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022",
+ "\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004" +
+ "\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetadataIt" +
+ "em\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n\006Foot" +
+ "er\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontentLengt" +
+ "h\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apache.ha" +
+ "doop.hive.ql.io.orc.StripeInformation\0225\n" +
+ "\005types\030\004 \003(\0132&.org.apache.hadoop.hive.ql" +
+ ".io.orc.Type\022D\n\010metadata\030\005 \003(\01322.org.apa" +
+ "che.hadoop.hive.ql.io.orc.UserMetadataIt" +
+ "em\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatistics\030\007",
+ " \003(\01322.org.apache.hadoop.hive.ql.io.orc." +
+ "ColumnStatistics\022\026\n\016rowIndexStride\030\010 \001(\r" +
+ "\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 \001(\004\022F\n" +
+ "\013compression\030\002 \001(\01621.org.apache.hadoop.h" +
+ "ive.ql.io.orc.CompressionKind\022\034\n\024compres" +
+ "sionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001" +
+ "\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind\022\010\n\004N" +
+ "ONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * Interface for reading integers.
+ */
+interface IntegerReader {
+
+ /**
+ * Seek to the position provided by index.
+ * @param index
+ * @throws IOException
+ */
+ void seek(PositionProvider index) throws IOException;
+
+ /**
+ * Skip number of specified rows.
+ * @param numValues
+ * @throws IOException
+ */
+ void skip(long numValues) throws IOException;
+
+ /**
+ * Check if there are any more values left.
+ * @return
+ * @throws IOException
+ */
+ boolean hasNext() throws IOException;
+
+ /**
+ * Return the next available value.
+ * @return
+ * @throws IOException
+ */
+ long next() throws IOException;
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java Mon Aug 12 15:03:30 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing integers.
+ */
+interface IntegerWriter {
+
+ /**
+ * Get position from the stream.
+ * @param recorder
+ * @throws IOException
+ */
+ void getPosition(PositionRecorder recorder) throws IOException;
+
+ /**
+ * Write the integer value
+ * @param value
+ * @throws IOException
+ */
+ void write(long value) throws IOException;
+
+ /**
+ * Flush the buffer
+ * @throws IOException
+ */
+ void flush() throws IOException;
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Mon Aug 12 15:03:30 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -130,6 +131,21 @@ class RecordReaderImpl implements Record
}
}
+ IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
+ InStream in,
+ boolean signed) throws IOException {
+ switch (kind) {
+ case DIRECT_V2:
+ case DICTIONARY_V2:
+ return new RunLengthIntegerReaderV2(in, signed);
+ case DIRECT:
+ case DICTIONARY:
+ return new RunLengthIntegerReader(in, signed);
+ default:
+ throw new IllegalArgumentException("Unknown encoding " + kind);
+ }
+ }
+
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encoding
) throws IOException {
@@ -266,20 +282,29 @@ class RecordReaderImpl implements Record
}
private static class ShortTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
ShortTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -310,20 +335,29 @@ class RecordReaderImpl implements Record
}
private static class IntTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
IntTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -354,20 +388,29 @@ class RecordReaderImpl implements Record
}
private static class LongTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
LongTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -492,13 +535,22 @@ class RecordReaderImpl implements Record
private static class BinaryTreeReader extends TreeReader{
private InStream stream;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
BinaryTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
@@ -506,9 +558,8 @@ class RecordReaderImpl implements Record
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = streams.get(name);
- lengths = new RunLengthIntegerReader(streams.get(new
- StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
- false);
+ lengths = createIntegerReader(encodings.get(columnId).getKind(), streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false);
}
@Override
@@ -555,22 +606,33 @@ class RecordReaderImpl implements Record
}
private static class TimestampTreeReader extends TreeReader{
- private RunLengthIntegerReader data;
- private RunLengthIntegerReader nanos;
+ private IntegerReader data = null;
+ private IntegerReader nanos = null;
TimestampTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
- data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.DATA)), true);
- nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.SECONDARY)), false);
+ data = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), true);
+ nanos = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.SECONDARY)), false);
}
@Override
@@ -625,20 +687,29 @@ class RecordReaderImpl implements Record
}
private static class DateTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
DateTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -670,20 +741,29 @@ class RecordReaderImpl implements Record
private static class DecimalTreeReader extends TreeReader{
private InStream valueStream;
- private RunLengthIntegerReader scaleStream;
+ private IntegerReader scaleStream = null;
DecimalTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
valueStream = streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA));
- scaleStream = new RunLengthIntegerReader(streams.get(
+ scaleStream = createIntegerReader(encodings.get(columnId).getKind(), streams.get(
new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
}
@@ -726,12 +806,9 @@ class RecordReaderImpl implements Record
super(path, columnId);
}
+ @Override
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
- if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
- encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
- throw new IOException("Unknown encoding " + encoding + " in column " +
- columnId + " of " + path);
- }
+ reader.checkEncoding(encoding);
}
@Override
@@ -742,9 +819,11 @@ class RecordReaderImpl implements Record
// reader
switch (encodings.get(columnId).getKind()) {
case DIRECT:
+ case DIRECT_V2:
reader = new StringDirectTreeReader(path, columnId);
break;
case DICTIONARY:
+ case DICTIONARY_V2:
reader = new StringDictionaryTreeReader(path, columnId);
break;
default:
@@ -776,7 +855,7 @@ class RecordReaderImpl implements Record
*/
private static class StringDirectTreeReader extends TreeReader {
private InStream stream;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths;
StringDirectTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -784,7 +863,11 @@ class RecordReaderImpl implements Record
@Override
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
- // PASS
+ if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
+ encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
}
@Override
@@ -795,8 +878,8 @@ class RecordReaderImpl implements Record
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = streams.get(name);
- lengths = new RunLengthIntegerReader(streams.get(new
- StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ lengths = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
false);
}
@@ -851,7 +934,7 @@ class RecordReaderImpl implements Record
private static class StringDictionaryTreeReader extends TreeReader {
private DynamicByteArray dictionaryBuffer;
private int[] dictionaryOffsets;
- private RunLengthIntegerReader reader;
+ private IntegerReader reader;
StringDictionaryTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -859,7 +942,11 @@ class RecordReaderImpl implements Record
@Override
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
- // PASS
+ if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
+ encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
}
@Override
@@ -884,7 +971,8 @@ class RecordReaderImpl implements Record
// read the lengths
name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
in = streams.get(name);
- RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
+ IntegerReader lenReader = createIntegerReader(encodings.get(columnId)
+ .getKind(), in, false);
int offset = 0;
if (dictionaryOffsets == null ||
dictionaryOffsets.length < dictionarySize + 1) {
@@ -899,7 +987,8 @@ class RecordReaderImpl implements Record
// set up the row reader
name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), false);
+ reader = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(name), false);
}
@Override
@@ -1101,7 +1190,7 @@ class RecordReaderImpl implements Record
private static class ListTreeReader extends TreeReader {
private final TreeReader elementReader;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
ListTreeReader(Path path, int columnId,
List<OrcProto.Type> types,
@@ -1150,12 +1239,22 @@ class RecordReaderImpl implements Record
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ lengths = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
if (elementReader != null) {
elementReader.startStripe(streams, encodings);
}
@@ -1175,7 +1274,7 @@ class RecordReaderImpl implements Record
private static class MapTreeReader extends TreeReader {
private final TreeReader keyReader;
private final TreeReader valueReader;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
MapTreeReader(Path path,
int columnId,
@@ -1228,12 +1327,22 @@ class RecordReaderImpl implements Record
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ lengths = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
if (keyReader != null) {
keyReader.startStripe(streams, encodings);
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig?rev=1513155&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig Mon Aug 12 15:03:30 2013
@@ -0,0 +1,1505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+class RecordReaderImpl implements RecordReader {
+ private final FSDataInputStream file;
+ private final long firstRow;
+ private final List<StripeInformation> stripes =
+ new ArrayList<StripeInformation>();
+ private OrcProto.StripeFooter stripeFooter;
+ private final long totalRowCount;
+ private final CompressionCodec codec;
+ private final int bufferSize;
+ private final boolean[] included;
+ private final long rowIndexStride;
+ private long rowInStripe = 0;
+ private int currentStripe = 0;
+ private long rowBaseInStripe = 0;
+ private long rowCountInStripe = 0;
+ private final Map<StreamName, InStream> streams =
+ new HashMap<StreamName, InStream>();
+ private final TreeReader reader;
+ private final OrcProto.RowIndex[] indexes;
+
+ RecordReaderImpl(Iterable<StripeInformation> stripes,
+ FileSystem fileSystem,
+ Path path,
+ long offset, long length,
+ List<OrcProto.Type> types,
+ CompressionCodec codec,
+ int bufferSize,
+ boolean[] included,
+ long strideRate
+ ) throws IOException {
+ this.file = fileSystem.open(path);
+ this.codec = codec;
+ this.bufferSize = bufferSize;
+ this.included = included;
+ long rows = 0;
+ long skippedRows = 0;
+ for(StripeInformation stripe: stripes) {
+ long stripeStart = stripe.getOffset();
+ if (offset > stripeStart) {
+ skippedRows += stripe.getNumberOfRows();
+ } else if (stripeStart < offset + length) {
+ this.stripes.add(stripe);
+ rows += stripe.getNumberOfRows();
+ }
+ }
+ firstRow = skippedRows;
+ totalRowCount = rows;
+ reader = createTreeReader(path, 0, types, included);
+ indexes = new OrcProto.RowIndex[types.size()];
+ rowIndexStride = strideRate;
+ if (this.stripes.size() > 0) {
+ readStripe();
+ }
+ }
+
+ private static final class PositionProviderImpl implements PositionProvider {
+ private final OrcProto.RowIndexEntry entry;
+ private int index = 0;
+
+ PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+ this.entry = entry;
+ }
+
+ @Override
+ public long getNext() {
+ return entry.getPositions(index++);
+ }
+ }
+
+ private abstract static class TreeReader {
+ protected final Path path;
+ protected final int columnId;
+ private BitFieldReader present = null;
+ protected boolean valuePresent = false;
+
+ TreeReader(Path path, int columnId) {
+ this.path = path;
+ this.columnId = columnId;
+ }
+
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encoding
+ ) throws IOException {
+ checkEncoding(encoding.get(columnId));
+ InStream in = streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.PRESENT));
+ if (in == null) {
+ present = null;
+ valuePresent = true;
+ } else {
+ present = new BitFieldReader(in, 1);
+ }
+ }
+
+ /**
+ * Seek to the given position.
+ * @param index the indexes loaded from the file
+ * @throws IOException
+ */
+ void seek(PositionProvider[] index) throws IOException {
+ if (present != null) {
+ present.seek(index[columnId]);
+ }
+ }
+
+ protected long countNonNulls(long rows) throws IOException {
+ if (present != null) {
+ long result = 0;
+ for(long c=0; c < rows; ++c) {
+ if (present.next() == 1) {
+ result += 1;
+ }
+ }
+ return result;
+ } else {
+ return rows;
+ }
+ }
+
+ abstract void skipRows(long rows) throws IOException;
+
+ Object next(Object previous) throws IOException {
+ if (present != null) {
+ valuePresent = present.next() == 1;
+ }
+ return previous;
+ }
+ }
+
+ private static class BooleanTreeReader extends TreeReader{
+ private BitFieldReader reader = null;
+
+ BooleanTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ reader = new BitFieldReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), 1);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ BooleanWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new BooleanWritable();
+ } else {
+ result = (BooleanWritable) previous;
+ }
+ result.set(reader.next() == 1);
+ }
+ return result;
+ }
+ }
+
+ private static class ByteTreeReader extends TreeReader{
+ private RunLengthByteReader reader = null;
+
+ ByteTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ reader = new RunLengthByteReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)));
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ ByteWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new ByteWritable();
+ } else {
+ result = (ByteWritable) previous;
+ }
+ result.set(reader.next());
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
+ private static class ShortTreeReader extends TreeReader{
+ private RunLengthIntegerReader reader = null;
+
+ ShortTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ ShortWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new ShortWritable();
+ } else {
+ result = (ShortWritable) previous;
+ }
+ result.set((short) reader.next());
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
+ private static class IntTreeReader extends TreeReader{
+ private RunLengthIntegerReader reader = null;
+
+ IntTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ IntWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new IntWritable();
+ } else {
+ result = (IntWritable) previous;
+ }
+ result.set((int) reader.next());
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
+ private static class LongTreeReader extends TreeReader{
+ private RunLengthIntegerReader reader = null;
+
+ LongTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ LongWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new LongWritable();
+ } else {
+ result = (LongWritable) previous;
+ }
+ result.set(reader.next());
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
+ private static class FloatTreeReader extends TreeReader{
+ private InStream stream;
+
+ FloatTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ stream = streams.get(name);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ stream.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ FloatWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new FloatWritable();
+ } else {
+ result = (FloatWritable) previous;
+ }
+ result.set(SerializationUtils.readFloat(stream));
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ for(int i=0; i < items; ++i) {
+ SerializationUtils.readFloat(stream);
+ }
+ }
+ }
+
+ private static class DoubleTreeReader extends TreeReader{
+ private InStream stream;
+
+ DoubleTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name =
+ new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ stream = streams.get(name);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ stream.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ DoubleWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new DoubleWritable();
+ } else {
+ result = (DoubleWritable) previous;
+ }
+ result.set(SerializationUtils.readDouble(stream));
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ stream.skip(items * 8);
+ }
+ }
+
+ private static class BinaryTreeReader extends TreeReader{
+ private InStream stream;
+ private RunLengthIntegerReader lengths;
+
+ BinaryTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ stream = streams.get(name);
+ lengths = new RunLengthIntegerReader(streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ false);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ stream.seek(index[columnId]);
+ lengths.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ BytesWritable result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new BytesWritable();
+ } else {
+ result = (BytesWritable) previous;
+ }
+ int len = (int) lengths.next();
+ result.setSize(len);
+ int offset = 0;
+ while (len > 0) {
+ int written = stream.read(result.getBytes(), offset, len);
+ if (written < 0) {
+ throw new EOFException("Can't finish byte read from " + stream);
+ }
+ len -= written;
+ offset += written;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ long lengthToSkip = 0;
+ for(int i=0; i < items; ++i) {
+ lengthToSkip += lengths.next();
+ }
+ stream.skip(lengthToSkip);
+ }
+ }
+
+ private static class TimestampTreeReader extends TreeReader{
+ private RunLengthIntegerReader data;
+ private RunLengthIntegerReader nanos;
+
+ TimestampTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), true);
+ nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.SECONDARY)), false);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ data.seek(index[columnId]);
+ nanos.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ Timestamp result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new Timestamp(0);
+ } else {
+ result = (Timestamp) previous;
+ }
+ long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) *
+ WriterImpl.MILLIS_PER_SECOND;
+ int newNanos = parseNanos(nanos.next());
+ // fix the rounding when we divided by 1000.
+ if (millis >= 0) {
+ millis += newNanos / 1000000;
+ } else {
+ millis -= newNanos / 1000000;
+ }
+ result.setTime(millis);
+ result.setNanos(newNanos);
+ }
+ return result;
+ }
+
+ private static int parseNanos(long serialized) {
+ int zeros = 7 & (int) serialized;
+ int result = (int) serialized >>> 3;
+ if (zeros != 0) {
+ for(int i =0; i <= zeros; ++i) {
+ result *= 10;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ data.skip(items);
+ nanos.skip(items);
+ }
+ }
+
+ private static class DateTreeReader extends TreeReader{
+ private RunLengthIntegerReader reader = null;
+
+ DateTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ Date result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new Date(0);
+ } else {
+ result = (Date) previous;
+ }
+ result.setTime(DateWritable.daysToMillis((int) reader.next()));
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
+ private static class DecimalTreeReader extends TreeReader{
+ private InStream valueStream;
+ private RunLengthIntegerReader scaleStream;
+
+ DecimalTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ valueStream = streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA));
+ scaleStream = new RunLengthIntegerReader(streams.get(
+ new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ valueStream.seek(index[columnId]);
+ scaleStream.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ if (valuePresent) {
+ return new HiveDecimal(SerializationUtils.readBigInteger(valueStream),
+ (int) scaleStream.next());
+ }
+ return null;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ for(int i=0; i < items; i++) {
+ SerializationUtils.readBigInteger(valueStream);
+ }
+ scaleStream.skip(items);
+ }
+ }
+
+ /**
+ * A tree reader that will read string columns. At the start of the
+ * stripe, it creates an internal reader based on whether a direct or
+ * dictionary encoding was used.
+ */
+ private static class StringTreeReader extends TreeReader {
+ private TreeReader reader;
+
+ StringTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
+ encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ // For each stripe, checks the encoding and initializes the appropriate
+ // reader
+ switch (encodings.get(columnId).getKind()) {
+ case DIRECT:
+ reader = new StringDirectTreeReader(path, columnId);
+ break;
+ case DICTIONARY:
+ reader = new StringDictionaryTreeReader(path, columnId);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported encoding " +
+ encodings.get(columnId).getKind());
+ }
+ reader.startStripe(streams, encodings);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ reader.seek(index);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ return reader.next(previous);
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skipRows(items);
+ }
+ }
+
+ /**
+ * A reader for string columns that are direct encoded in the current
+ * stripe.
+ */
+ private static class StringDirectTreeReader extends TreeReader {
+ private InStream stream;
+ private RunLengthIntegerReader lengths;
+
+ StringDirectTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ // PASS
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ stream = streams.get(name);
+ lengths = new RunLengthIntegerReader(streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ false);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ stream.seek(index[columnId]);
+ lengths.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ Text result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new Text();
+ } else {
+ result = (Text) previous;
+ }
+ int len = (int) lengths.next();
+ int offset = 0;
+ byte[] bytes = new byte[len];
+ while (len > 0) {
+ int written = stream.read(bytes, offset, len);
+ if (written < 0) {
+ throw new EOFException("Can't finish byte read from " + stream);
+ }
+ len -= written;
+ offset += written;
+ }
+ result.set(bytes);
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ long lengthToSkip = 0;
+ for(int i=0; i < items; ++i) {
+ lengthToSkip += lengths.next();
+ }
+ stream.skip(lengthToSkip);
+ }
+ }
+
+ /**
+ * A reader for string columns that are dictionary encoded in the current
+ * stripe.
+ */
+ private static class StringDictionaryTreeReader extends TreeReader {
+ private DynamicByteArray dictionaryBuffer;
+ private int[] dictionaryOffsets;
+ private RunLengthIntegerReader reader;
+
+ StringDictionaryTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ // PASS
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+
+ // read the dictionary blob
+ int dictionarySize = encodings.get(columnId).getDictionarySize();
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DICTIONARY_DATA);
+ InStream in = streams.get(name);
+ if (in.available() > 0) {
+ dictionaryBuffer = new DynamicByteArray(64, in.available());
+ dictionaryBuffer.readAll(in);
+ } else {
+ dictionaryBuffer = null;
+ }
+ in.close();
+
+ // read the lengths
+ name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
+ in = streams.get(name);
+ RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
+ int offset = 0;
+ if (dictionaryOffsets == null ||
+ dictionaryOffsets.length < dictionarySize + 1) {
+ dictionaryOffsets = new int[dictionarySize + 1];
+ }
+ for(int i=0; i < dictionarySize; ++i) {
+ dictionaryOffsets[i] = offset;
+ offset += (int) lenReader.next();
+ }
+ dictionaryOffsets[dictionarySize] = offset;
+ in.close();
+
+ // set up the row reader
+ name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
+ reader = new RunLengthIntegerReader(streams.get(name), false);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ Text result = null;
+ if (valuePresent) {
+ int entry = (int) reader.next();
+ if (previous == null) {
+ result = new Text();
+ } else {
+ result = (Text) previous;
+ }
+ int offset = dictionaryOffsets[entry];
+ int length;
+ // if it isn't the last entry, subtract the offsets otherwise use
+ // the buffer length.
+ if (entry < dictionaryOffsets.length - 1) {
+ length = dictionaryOffsets[entry + 1] - offset;
+ } else {
+ length = dictionaryBuffer.size() - offset;
+ }
+ // If the column is just empty strings, the size will be zero,
+ // so the buffer will be null, in that case just return result
+ // as it will default to empty
+ if (dictionaryBuffer != null) {
+ dictionaryBuffer.setText(result, offset, length);
+ } else {
+ result.clear();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
+ private static class StructTreeReader extends TreeReader {
+ private final TreeReader[] fields;
+ private final String[] fieldNames;
+
+ StructTreeReader(Path path, int columnId,
+ List<OrcProto.Type> types,
+ boolean[] included) throws IOException {
+ super(path, columnId);
+ OrcProto.Type type = types.get(columnId);
+ int fieldCount = type.getFieldNamesCount();
+ this.fields = new TreeReader[fieldCount];
+ this.fieldNames = new String[fieldCount];
+ for(int i=0; i < fieldCount; ++i) {
+ int subtype = type.getSubtypes(i);
+ if (included == null || included[subtype]) {
+ this.fields[i] = createTreeReader(path, subtype, types, included);
+ }
+ this.fieldNames[i] = type.getFieldNames(i);
+ }
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ for(TreeReader kid: fields) {
+ if (kid != null) {
+ kid.seek(index);
+ }
+ }
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ OrcStruct result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new OrcStruct(fields.length);
+ } else {
+ result = (OrcStruct) previous;
+
+ // If the input format was initialized with a file with a
+ // different number of fields, the number of fields needs to
+ // be updated to the correct number
+ if (result.getNumFields() != fields.length) {
+ result.setNumFields(fields.length);
+ }
+ }
+ for(int i=0; i < fields.length; ++i) {
+ if (fields[i] != null) {
+ result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ for(TreeReader field: fields) {
+ if (field != null) {
+ field.startStripe(streams, encodings);
+ }
+ }
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ for(TreeReader field: fields) {
+ if (field != null) {
+ field.skipRows(items);
+ }
+ }
+ }
+ }
+
+ private static class UnionTreeReader extends TreeReader {
+ private final TreeReader[] fields;
+ private RunLengthByteReader tags;
+
+ UnionTreeReader(Path path, int columnId,
+ List<OrcProto.Type> types,
+ boolean[] included) throws IOException {
+ super(path, columnId);
+ OrcProto.Type type = types.get(columnId);
+ int fieldCount = type.getSubtypesCount();
+ this.fields = new TreeReader[fieldCount];
+ for(int i=0; i < fieldCount; ++i) {
+ int subtype = type.getSubtypes(i);
+ if (included == null || included[subtype]) {
+ this.fields[i] = createTreeReader(path, subtype, types, included);
+ }
+ }
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ tags.seek(index[columnId]);
+ for(TreeReader kid: fields) {
+ kid.seek(index);
+ }
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ OrcUnion result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new OrcUnion();
+ } else {
+ result = (OrcUnion) previous;
+ }
+ byte tag = tags.next();
+ Object previousVal = result.getObject();
+ result.set(tag, fields[tag].next(tag == result.getTag() ?
+ previousVal : null));
+ }
+ return result;
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ tags = new RunLengthByteReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)));
+ for(TreeReader field: fields) {
+ if (field != null) {
+ field.startStripe(streams, encodings);
+ }
+ }
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ long[] counts = new long[fields.length];
+ for(int i=0; i < items; ++i) {
+ counts[tags.next()] += 1;
+ }
+ for(int i=0; i < counts.length; ++i) {
+ fields[i].skipRows(counts[i]);
+ }
+ }
+ }
+
+ private static class ListTreeReader extends TreeReader {
+ private final TreeReader elementReader;
+ private RunLengthIntegerReader lengths;
+
+ ListTreeReader(Path path, int columnId,
+ List<OrcProto.Type> types,
+ boolean[] included) throws IOException {
+ super(path, columnId);
+ OrcProto.Type type = types.get(columnId);
+ elementReader = createTreeReader(path, type.getSubtypes(0), types,
+ included);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ lengths.seek(index[columnId]);
+ elementReader.seek(index);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ List<Object> result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new ArrayList<Object>();
+ } else {
+ result = (ArrayList<Object>) previous;
+ }
+ int prevLength = result.size();
+ int length = (int) lengths.next();
+ // extend the list to the new length
+ for(int i=prevLength; i < length; ++i) {
+ result.add(null);
+ }
+ // read the new elements into the array
+ for(int i=0; i< length; i++) {
+ result.set(i, elementReader.next(i < prevLength ?
+ result.get(i) : null));
+ }
+ // remove any extra elements
+ for(int i=prevLength - 1; i >= length; --i) {
+ result.remove(i);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ if (elementReader != null) {
+ elementReader.startStripe(streams, encodings);
+ }
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ long childSkip = 0;
+ for(long i=0; i < items; ++i) {
+ childSkip += lengths.next();
+ }
+ elementReader.skipRows(childSkip);
+ }
+ }
+
+ private static class MapTreeReader extends TreeReader {
+ private final TreeReader keyReader;
+ private final TreeReader valueReader;
+ private RunLengthIntegerReader lengths;
+
+ MapTreeReader(Path path,
+ int columnId,
+ List<OrcProto.Type> types,
+ boolean[] included) throws IOException {
+ super(path, columnId);
+ OrcProto.Type type = types.get(columnId);
+ int keyColumn = type.getSubtypes(0);
+ int valueColumn = type.getSubtypes(1);
+ if (included == null || included[keyColumn]) {
+ keyReader = createTreeReader(path, keyColumn, types, included);
+ } else {
+ keyReader = null;
+ }
+ if (included == null || included[valueColumn]) {
+ valueReader = createTreeReader(path, valueColumn, types, included);
+ } else {
+ valueReader = null;
+ }
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ lengths.seek(index[columnId]);
+ keyReader.seek(index);
+ valueReader.seek(index);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ Map<Object, Object> result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new HashMap<Object, Object>();
+ } else {
+ result = (HashMap<Object, Object>) previous;
+ }
+ // for now just clear and create new objects
+ result.clear();
+ int length = (int) lengths.next();
+ // read the new elements into the array
+ for(int i=0; i< length; i++) {
+ result.put(keyReader.next(null), valueReader.next(null));
+ }
+ }
+ return result;
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
+ if (keyReader != null) {
+ keyReader.startStripe(streams, encodings);
+ }
+ if (valueReader != null) {
+ valueReader.startStripe(streams, encodings);
+ }
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ long childSkip = 0;
+ for(long i=0; i < items; ++i) {
+ childSkip += lengths.next();
+ }
+ keyReader.skipRows(childSkip);
+ valueReader.skipRows(childSkip);
+ }
+ }
+
+ private static TreeReader createTreeReader(Path path,
+ int columnId,
+ List<OrcProto.Type> types,
+ boolean[] included
+ ) throws IOException {
+ OrcProto.Type type = types.get(columnId);
+ switch (type.getKind()) {
+ case BOOLEAN:
+ return new BooleanTreeReader(path, columnId);
+ case BYTE:
+ return new ByteTreeReader(path, columnId);
+ case DOUBLE:
+ return new DoubleTreeReader(path, columnId);
+ case FLOAT:
+ return new FloatTreeReader(path, columnId);
+ case SHORT:
+ return new ShortTreeReader(path, columnId);
+ case INT:
+ return new IntTreeReader(path, columnId);
+ case LONG:
+ return new LongTreeReader(path, columnId);
+ case STRING:
+ return new StringTreeReader(path, columnId);
+ case BINARY:
+ return new BinaryTreeReader(path, columnId);
+ case TIMESTAMP:
+ return new TimestampTreeReader(path, columnId);
+ case DATE:
+ return new DateTreeReader(path, columnId);
+ case DECIMAL:
+ return new DecimalTreeReader(path, columnId);
+ case STRUCT:
+ return new StructTreeReader(path, columnId, types, included);
+ case LIST:
+ return new ListTreeReader(path, columnId, types, included);
+ case MAP:
+ return new MapTreeReader(path, columnId, types, included);
+ case UNION:
+ return new UnionTreeReader(path, columnId, types, included);
+ default:
+ throw new IllegalArgumentException("Unsupported type " +
+ type.getKind());
+ }
+ }
+
+ OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
+ ) throws IOException {
+ long offset = stripe.getOffset() + stripe.getIndexLength() +
+ stripe.getDataLength();
+ int tailLength = (int) stripe.getFooterLength();
+
+ // read the footer
+ ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+ file.seek(offset);
+ file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
+ return OrcProto.StripeFooter.parseFrom(InStream.create("footer", tailBuf,
+ codec, bufferSize));
+ }
+
+ private void readStripe() throws IOException {
+ StripeInformation stripe = stripes.get(currentStripe);
+ stripeFooter = readStripeFooter(stripe);
+ long offset = stripe.getOffset();
+ streams.clear();
+
+ // if we aren't projecting columns, just read the whole stripe
+ if (included == null) {
+ byte[] buffer =
+ new byte[(int) (stripe.getDataLength())];
+ file.seek(offset + stripe.getIndexLength());
+ file.readFully(buffer, 0, buffer.length);
+ int sectionOffset = 0;
+ for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
+ if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
+ int sectionLength = (int) section.getLength();
+ ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
+ sectionLength);
+ StreamName name = new StreamName(section.getColumn(),
+ section.getKind());
+ streams.put(name,
+ InStream.create(name.toString(), sectionBuffer, codec,
+ bufferSize));
+ sectionOffset += sectionLength;
+ }
+ }
+ } else {
+ List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+ // the index of the current section
+ int currentSection = 0;
+ while (currentSection < streamList.size() &&
+ StreamName.getArea(streamList.get(currentSection).getKind()) !=
+ StreamName.Area.DATA) {
+ currentSection += 1;
+ }
+ // byte position of the current section relative to the stripe start
+ long sectionOffset = stripe.getIndexLength();
+ while (currentSection < streamList.size()) {
+ int bytes = 0;
+
+ // find the first section that shouldn't be read
+ int excluded=currentSection;
+ while (excluded < streamList.size() &&
+ included[streamList.get(excluded).getColumn()]) {
+ bytes += streamList.get(excluded).getLength();
+ excluded += 1;
+ }
+
+ // actually read the bytes as a big chunk
+ if (bytes != 0) {
+ byte[] buffer = new byte[bytes];
+ file.seek(offset + sectionOffset);
+ file.readFully(buffer, 0, bytes);
+ sectionOffset += bytes;
+
+ // create the streams for the sections we just read
+ bytes = 0;
+ while (currentSection < excluded) {
+ OrcProto.Stream section = streamList.get(currentSection);
+ StreamName name =
+ new StreamName(section.getColumn(), section.getKind());
+ this.streams.put(name,
+ InStream.create(name.toString(),
+ ByteBuffer.wrap(buffer, bytes,
+ (int) section.getLength()), codec, bufferSize));
+ currentSection += 1;
+ bytes += section.getLength();
+ }
+ }
+
+ // skip forward until we get back to a section that we need
+ while (currentSection < streamList.size() &&
+ !included[streamList.get(currentSection).getColumn()]) {
+ sectionOffset += streamList.get(currentSection).getLength();
+ currentSection += 1;
+ }
+ }
+ }
+ reader.startStripe(streams, stripeFooter.getColumnsList());
+ rowInStripe = 0;
+ rowCountInStripe = stripe.getNumberOfRows();
+ rowBaseInStripe = 0;
+ for(int i=0; i < currentStripe; ++i) {
+ rowBaseInStripe += stripes.get(i).getNumberOfRows();
+ }
+ for(int i=0; i < indexes.length; ++i) {
+ indexes[i] = null;
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return rowInStripe < rowCountInStripe || currentStripe < stripes.size() - 1;
+ }
+
+ @Override
+ public Object next(Object previous) throws IOException {
+ if (rowInStripe >= rowCountInStripe) {
+ currentStripe += 1;
+ readStripe();
+ }
+ rowInStripe += 1;
+ return reader.next(previous);
+ }
+
+ @Override
+ public void close() throws IOException {
+ file.close();
+ }
+
+ @Override
+ public long getRowNumber() {
+ return rowInStripe + rowBaseInStripe + firstRow;
+ }
+
+ /**
+ * Return the fraction of rows that have been read from the selected.
+ * section of the file
+ * @return fraction between 0.0 and 1.0 of rows consumed
+ */
+ @Override
+ public float getProgress() {
+ return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
+ }
+
+ private int findStripe(long rowNumber) {
+ if (rowNumber < 0) {
+ throw new IllegalArgumentException("Seek to a negative row number " +
+ rowNumber);
+ } else if (rowNumber < firstRow) {
+ throw new IllegalArgumentException("Seek before reader range " +
+ rowNumber);
+ }
+ rowNumber -= firstRow;
+ for(int i=0; i < stripes.size(); i++) {
+ StripeInformation stripe = stripes.get(i);
+ if (stripe.getNumberOfRows() > rowNumber) {
+ return i;
+ }
+ rowNumber -= stripe.getNumberOfRows();
+ }
+ throw new IllegalArgumentException("Seek after the end of reader range");
+ }
+
+ private void readRowIndex() throws IOException {
+ long offset = stripes.get(currentStripe).getOffset();
+ for(OrcProto.Stream stream: stripeFooter.getStreamsList()) {
+ if (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX) {
+ int col = stream.getColumn();
+ if ((included == null || included[col]) && indexes[col] == null) {
+ byte[] buffer = new byte[(int) stream.getLength()];
+ file.seek(offset);
+ file.readFully(buffer);
+ indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
+ ByteBuffer.wrap(buffer), codec, bufferSize));
+ }
+ }
+ offset += stream.getLength();
+ }
+ }
+
+ private void seekToRowEntry(int rowEntry) throws IOException {
+ PositionProvider[] index = new PositionProvider[indexes.length];
+ for(int i=0; i < indexes.length; ++i) {
+ if (indexes[i] != null) {
+ index[i]=
+ new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+ }
+ }
+ reader.seek(index);
+ }
+
+ @Override
+ public void seekToRow(long rowNumber) throws IOException {
+ int rightStripe = findStripe(rowNumber);
+ if (rightStripe != currentStripe) {
+ currentStripe = rightStripe;
+ readStripe();
+ }
+ readRowIndex();
+ rowInStripe = rowNumber - rowBaseInStripe - firstRow;
+ if (rowIndexStride != 0) {
+ long entry = rowInStripe / rowIndexStride;
+ seekToRowEntry((int) entry);
+ reader.skipRows(rowInStripe - entry * rowIndexStride);
+ } else {
+ reader.skipRows(rowInStripe);
+ }
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1513155&r1=1513154&r2=1513155&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Mon Aug 12 15:03:30 2013
@@ -23,7 +23,7 @@ import java.io.IOException;
/**
* A reader that reads a sequence of integers.
* */
-class RunLengthIntegerReader {
+class RunLengthIntegerReader implements IntegerReader {
private final InStream input;
private final boolean signed;
private final long[] literals =
@@ -71,11 +71,13 @@ class RunLengthIntegerReader {
}
}
- boolean hasNext() throws IOException {
+ @Override
+ public boolean hasNext() throws IOException {
return used != numLiterals || input.available() > 0;
}
- long next() throws IOException {
+ @Override
+ public long next() throws IOException {
long result;
if (used == numLiterals) {
readValues();
@@ -88,7 +90,8 @@ class RunLengthIntegerReader {
return result;
}
- void seek(PositionProvider index) throws IOException {
+ @Override
+ public void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
if (consumed != 0) {
@@ -104,7 +107,8 @@ class RunLengthIntegerReader {
}
}
- void skip(long numValues) throws IOException {
+ @Override
+ public void skip(long numValues) throws IOException {
while (numValues > 0) {
if (used == numLiterals) {
readValues();