You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/18 00:06:14 UTC
[2/3] hive git commit: HIVE-10535 : LLAP: move
EncodedTreeReaderFactory, TreeReaderFactory bits that rely on orc.encoded,
and StreamUtils if needed, to orc.encoded package (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index 8295929..328e4e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -30,7 +30,6 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
-import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -64,17 +63,12 @@ public class TreeReaderFactory {
protected final int columnId;
protected BitFieldReader present = null;
protected boolean valuePresent = false;
- protected ColumnStreamData presentStreamBuffer = null;
- protected ColumnStreamData dataStreamBuffer = null;
- protected ColumnStreamData dictionaryStreamBuffer = null;
- protected ColumnStreamData lengthsStreamBuffer = null;
- protected ColumnStreamData secondaryStreamBuffer = null;
TreeReader(int columnId) throws IOException {
this(columnId, null);
}
- TreeReader(int columnId, InStream in) throws IOException {
+ protected TreeReader(int columnId, InStream in) throws IOException {
this.columnId = columnId;
if (in == null) {
present = null;
@@ -91,7 +85,7 @@ public class TreeReaderFactory {
}
}
- IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
+ static IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
InStream in,
boolean signed, boolean skipCorrupt) throws IOException {
switch (kind) {
@@ -136,37 +130,6 @@ public class TreeReaderFactory {
}
}
- public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
- throws IOException {
- // stream buffers are arranged in enum order of stream kind
- for (ColumnStreamData streamBuffer : buffers) {
- switch (streamBuffer.getStreamKind()) {
- case 0:
- // PRESENT stream
- presentStreamBuffer = streamBuffer;
- break;
- case 1:
- // DATA stream
- dataStreamBuffer = streamBuffer;
- break;
- case 2:
- // LENGTH stream
- lengthsStreamBuffer = streamBuffer;
- break;
- case 3:
- // DICTIONARY_DATA stream
- dictionaryStreamBuffer = streamBuffer;
- break;
- case 5:
- // SECONDARY stream
- secondaryStreamBuffer = streamBuffer;
- break;
- default:
- throw new IOException("Unexpected stream kind: " + streamBuffer.getStreamKind());
- }
- }
- }
-
protected long countNonNulls(long rows) throws IOException {
if (present != null) {
long result = 0;
@@ -222,16 +185,20 @@ public class TreeReaderFactory {
}
return previousVector;
}
+
+ public BitFieldReader getPresent() {
+ return present;
+ }
}
- protected static class BooleanTreeReader extends TreeReader {
+ public static class BooleanTreeReader extends TreeReader {
protected BitFieldReader reader = null;
BooleanTreeReader(int columnId) throws IOException {
this(columnId, null, null);
}
- BooleanTreeReader(int columnId, InStream present, InStream data) throws IOException {
+ protected BooleanTreeReader(int columnId, InStream present, InStream data) throws IOException {
super(columnId, present);
if (data != null) {
reader = new BitFieldReader(data, 1);
@@ -296,14 +263,14 @@ public class TreeReaderFactory {
}
}
- protected static class ByteTreeReader extends TreeReader {
+ public static class ByteTreeReader extends TreeReader {
protected RunLengthByteReader reader = null;
ByteTreeReader(int columnId) throws IOException {
this(columnId, null, null);
}
- ByteTreeReader(int columnId, InStream present, InStream data) throws IOException {
+ protected ByteTreeReader(int columnId, InStream present, InStream data) throws IOException {
super(columnId, present);
this.reader = new RunLengthByteReader(data);
}
@@ -366,14 +333,14 @@ public class TreeReaderFactory {
}
}
- protected static class ShortTreeReader extends TreeReader {
+ public static class ShortTreeReader extends TreeReader {
protected IntegerReader reader = null;
ShortTreeReader(int columnId) throws IOException {
this(columnId, null, null, null);
}
- ShortTreeReader(int columnId, InStream present, InStream data,
+ protected ShortTreeReader(int columnId, InStream present, InStream data,
OrcProto.ColumnEncoding encoding)
throws IOException {
super(columnId, present);
@@ -452,14 +419,14 @@ public class TreeReaderFactory {
}
}
- protected static class IntTreeReader extends TreeReader {
+ public static class IntTreeReader extends TreeReader {
protected IntegerReader reader = null;
IntTreeReader(int columnId) throws IOException {
this(columnId, null, null, null);
}
- IntTreeReader(int columnId, InStream present, InStream data,
+ protected IntTreeReader(int columnId, InStream present, InStream data,
OrcProto.ColumnEncoding encoding)
throws IOException {
super(columnId, present);
@@ -538,14 +505,14 @@ public class TreeReaderFactory {
}
}
- protected static class LongTreeReader extends TreeReader {
+ public static class LongTreeReader extends TreeReader {
protected IntegerReader reader = null;
LongTreeReader(int columnId, boolean skipCorrupt) throws IOException {
this(columnId, null, null, null, skipCorrupt);
}
- LongTreeReader(int columnId, InStream present, InStream data,
+ protected LongTreeReader(int columnId, InStream present, InStream data,
OrcProto.ColumnEncoding encoding,
boolean skipCorrupt)
throws IOException {
@@ -625,7 +592,7 @@ public class TreeReaderFactory {
}
}
- protected static class FloatTreeReader extends TreeReader {
+ public static class FloatTreeReader extends TreeReader {
protected InStream stream;
private final SerializationUtils utils;
@@ -633,7 +600,7 @@ public class TreeReaderFactory {
this(columnId, null, null);
}
- FloatTreeReader(int columnId, InStream present, InStream data) throws IOException {
+ protected FloatTreeReader(int columnId, InStream present, InStream data) throws IOException {
super(columnId, present);
this.utils = new SerializationUtils();
this.stream = data;
@@ -737,7 +704,7 @@ public class TreeReaderFactory {
}
}
- protected static class DoubleTreeReader extends TreeReader {
+ public static class DoubleTreeReader extends TreeReader {
protected InStream stream;
private final SerializationUtils utils;
@@ -745,7 +712,7 @@ public class TreeReaderFactory {
this(columnId, null, null);
}
- DoubleTreeReader(int columnId, InStream present, InStream data) throws IOException {
+ protected DoubleTreeReader(int columnId, InStream present, InStream data) throws IOException {
super(columnId, present);
this.utils = new SerializationUtils();
this.stream = data;
@@ -852,7 +819,7 @@ public class TreeReaderFactory {
}
}
- protected static class BinaryTreeReader extends TreeReader {
+ public static class BinaryTreeReader extends TreeReader {
protected InStream stream;
protected IntegerReader lengths = null;
protected final LongColumnVector scratchlcv;
@@ -861,7 +828,7 @@ public class TreeReaderFactory {
this(columnId, null, null, null, null);
}
- BinaryTreeReader(int columnId, InStream present, InStream data, InStream length,
+ protected BinaryTreeReader(int columnId, InStream present, InStream data, InStream length,
OrcProto.ColumnEncoding encoding) throws IOException {
super(columnId, present);
scratchlcv = new LongColumnVector();
@@ -959,7 +926,7 @@ public class TreeReaderFactory {
}
}
- protected static class TimestampTreeReader extends TreeReader {
+ public static class TimestampTreeReader extends TreeReader {
protected IntegerReader data = null;
protected IntegerReader nanos = null;
private final boolean skipCorrupt;
@@ -973,7 +940,7 @@ public class TreeReaderFactory {
this(columnId, null, null, null, null, skipCorrupt);
}
- TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream,
+ protected TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream,
InStream nanosStream, OrcProto.ColumnEncoding encoding, boolean skipCorrupt)
throws IOException {
super(columnId, presentStream);
@@ -1144,14 +1111,14 @@ public class TreeReaderFactory {
}
}
- protected static class DateTreeReader extends TreeReader {
+ public static class DateTreeReader extends TreeReader {
protected IntegerReader reader = null;
DateTreeReader(int columnId) throws IOException {
this(columnId, null, null, null);
}
- DateTreeReader(int columnId, InStream present, InStream data,
+ protected DateTreeReader(int columnId, InStream present, InStream data,
OrcProto.ColumnEncoding encoding) throws IOException {
super(columnId, present);
if (data != null && encoding != null) {
@@ -1229,7 +1196,7 @@ public class TreeReaderFactory {
}
}
- protected static class DecimalTreeReader extends TreeReader {
+ public static class DecimalTreeReader extends TreeReader {
protected InStream valueStream;
protected IntegerReader scaleReader = null;
private LongColumnVector scratchScaleVector;
@@ -1241,7 +1208,7 @@ public class TreeReaderFactory {
this(columnId, precision, scale, null, null, null, null);
}
- DecimalTreeReader(int columnId, int precision, int scale, InStream present,
+ protected DecimalTreeReader(int columnId, int precision, int scale, InStream present,
InStream valueStream, InStream scaleStream, OrcProto.ColumnEncoding encoding)
throws IOException {
super(columnId, present);
@@ -1363,14 +1330,14 @@ public class TreeReaderFactory {
* stripe, it creates an internal reader based on whether a direct or
* dictionary encoding was used.
*/
- protected static class StringTreeReader extends TreeReader {
+ public static class StringTreeReader extends TreeReader {
protected TreeReader reader;
StringTreeReader(int columnId) throws IOException {
super(columnId);
}
- StringTreeReader(int columnId, InStream present, InStream data, InStream length,
+ protected StringTreeReader(int columnId, InStream present, InStream data, InStream length,
InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
super(columnId, present);
if (encoding != null) {
@@ -1521,7 +1488,7 @@ public class TreeReaderFactory {
* A reader for string columns that are direct encoded in the current
* stripe.
*/
- protected static class StringDirectTreeReader extends TreeReader {
+ public static class StringDirectTreeReader extends TreeReader {
protected InStream stream;
protected IntegerReader lengths;
private final LongColumnVector scratchlcv;
@@ -1530,8 +1497,8 @@ public class TreeReaderFactory {
this(columnId, null, null, null, null);
}
- StringDirectTreeReader(int columnId, InStream present, InStream data, InStream length,
- OrcProto.ColumnEncoding.Kind encoding) throws IOException {
+ protected StringDirectTreeReader(int columnId, InStream present, InStream data,
+ InStream length, OrcProto.ColumnEncoding.Kind encoding) throws IOException {
super(columnId, present);
this.scratchlcv = new LongColumnVector();
this.stream = data;
@@ -1628,13 +1595,21 @@ public class TreeReaderFactory {
lengthToSkip -= stream.skip(lengthToSkip);
}
}
+
+ public IntegerReader getLengths() {
+ return lengths;
+ }
+
+ public InStream getStream() {
+ return stream;
+ }
}
/**
* A reader for string columns that are dictionary encoded in the current
* stripe.
*/
- protected static class StringDictionaryTreeReader extends TreeReader {
+ public static class StringDictionaryTreeReader extends TreeReader {
private DynamicByteArray dictionaryBuffer;
private int[] dictionaryOffsets;
protected IntegerReader reader;
@@ -1646,7 +1621,7 @@ public class TreeReaderFactory {
this(columnId, null, null, null, null, null);
}
- StringDictionaryTreeReader(int columnId, InStream present, InStream data,
+ protected StringDictionaryTreeReader(int columnId, InStream present, InStream data,
InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding)
throws IOException {
super(columnId, present);
@@ -1839,16 +1814,20 @@ public class TreeReaderFactory {
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
+
+ public IntegerReader getReader() {
+ return reader;
+ }
}
- protected static class CharTreeReader extends StringTreeReader {
+ public static class CharTreeReader extends StringTreeReader {
int maxLength;
CharTreeReader(int columnId, int maxLength) throws IOException {
this(columnId, maxLength, null, null, null, null, null);
}
- CharTreeReader(int columnId, int maxLength, InStream present, InStream data,
+ protected CharTreeReader(int columnId, int maxLength, InStream present, InStream data,
InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
super(columnId, present, data, length, dictionary, encoding);
this.maxLength = maxLength;
@@ -1915,14 +1894,14 @@ public class TreeReaderFactory {
}
}
- protected static class VarcharTreeReader extends StringTreeReader {
+ public static class VarcharTreeReader extends StringTreeReader {
int maxLength;
VarcharTreeReader(int columnId, int maxLength) throws IOException {
this(columnId, maxLength, null, null, null, null, null);
}
- VarcharTreeReader(int columnId, int maxLength, InStream present, InStream data,
+ protected VarcharTreeReader(int columnId, int maxLength, InStream present, InStream data,
InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
super(columnId, present, data, length, dictionary, encoding);
this.maxLength = maxLength;
@@ -1987,11 +1966,11 @@ public class TreeReaderFactory {
}
}
- protected static class StructTreeReader extends TreeReader {
+ public static class StructTreeReader extends TreeReader {
protected final TreeReader[] fields;
private final String[] fieldNames;
- StructTreeReader(int columnId,
+ protected StructTreeReader(int columnId,
List<OrcProto.Type> types,
boolean[] included,
boolean skipCorrupt) throws IOException {
@@ -2090,11 +2069,11 @@ public class TreeReaderFactory {
}
}
- protected static class UnionTreeReader extends TreeReader {
+ public static class UnionTreeReader extends TreeReader {
protected final TreeReader[] fields;
protected RunLengthByteReader tags;
- UnionTreeReader(int columnId,
+ protected UnionTreeReader(int columnId,
List<OrcProto.Type> types,
boolean[] included,
boolean skipCorrupt) throws IOException {
@@ -2170,11 +2149,11 @@ public class TreeReaderFactory {
}
}
- protected static class ListTreeReader extends TreeReader {
+ public static class ListTreeReader extends TreeReader {
protected final TreeReader elementReader;
protected IntegerReader lengths = null;
- ListTreeReader(int columnId,
+ protected ListTreeReader(int columnId,
List<OrcProto.Type> types,
boolean[] included,
boolean skipCorrupt) throws IOException {
@@ -2259,12 +2238,12 @@ public class TreeReaderFactory {
}
}
- protected static class MapTreeReader extends TreeReader {
+ public static class MapTreeReader extends TreeReader {
protected final TreeReader keyReader;
protected final TreeReader valueReader;
protected IntegerReader lengths = null;
- MapTreeReader(int columnId,
+ protected MapTreeReader(int columnId,
List<OrcProto.Type> types,
boolean[] included,
boolean skipCorrupt) throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
index b3c9169..00dcc15 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/CacheChunk.java
@@ -35,7 +35,7 @@ public class CacheChunk extends DiskRangeList {
@Override
public ByteBuffer getData() {
- // Callers duplicate the buffer, they have to for BufferChunk
+ // Callers duplicate the buffer, they have to for BufferChunk; so we don't have to.
return buffer.getByteBufferRaw();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b18db4f4/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index ce503d9..b1477e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -348,7 +348,7 @@ class EncodedReaderImpl implements EncodedReader {
ColumnReadContext ctx = colCtxs[colIxMod];
RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
- ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount);
+ ecb.initColumn(colIxMod, ctx.colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
ColumnStreamData cb = null;
@@ -360,7 +360,6 @@ class EncodedReaderImpl implements EncodedReader {
}
if (sctx.stripeLevelStream == null) {
sctx.stripeLevelStream = POOLS.csdPool.take();
- sctx.stripeLevelStream.init(sctx.kind.getNumber());
// We will be using this for each RG while also sending RGs to processing.
// To avoid buffers being unlocked, run refcount one ahead; we will not increase
// it when building the last RG, so each RG processing will decref once, and the
@@ -399,7 +398,7 @@ class EncodedReaderImpl implements EncodedReader {
sctx.bufferIter = iter = lastCached;
}
}
- ecb.setStreamData(colIxMod, streamIx, cb);
+ ecb.setStreamData(colIxMod, sctx.kind.getNumber(), cb);
}
}
if (isRGSelected) {
@@ -431,9 +430,7 @@ class EncodedReaderImpl implements EncodedReader {
private ColumnStreamData createRgColumnStreamData(int rgIx, boolean isLastRg,
int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean isCompressed) {
- ColumnStreamData cb;
- cb = POOLS.csdPool.take();
- cb.init(sctx.kind.getNumber());
+ ColumnStreamData cb = POOLS.csdPool.take();
cb.incRef();
if (isDebugTracingEnabled) {
LOG.info("Getting data for column "+ colIx + " " + (isLastRg ? "last " : "")