You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/10 02:36:40 UTC
[1/3] hive git commit: HIVE-15831 : LLAP: Fix a problem of the output
of LlapDump (Takanobu Asanuma, reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 050e679df -> 8f273cc53
HIVE-15831 : LLAP: Fix a problem of the output of LlapDump (Takanobu Asanuma, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fbe9b05f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fbe9b05f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fbe9b05f
Branch: refs/heads/master
Commit: fbe9b05f00bc54f012754a1eeb4f404bea3a69e7
Parents: 050e679
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Feb 9 18:24:14 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Feb 9 18:24:14 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/llap/LlapDump.java | 30 +++-----------------
1 file changed, 4 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/fbe9b05f/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
index 08ad1f5..4a83141 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapDump.java
@@ -17,17 +17,7 @@
*/
package org.apache.hadoop.hive.llap;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.FileInputStream;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
@@ -39,23 +29,10 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
-import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
-import org.apache.hadoop.hive.llap.LlapRowInputFormat;
-import org.apache.hadoop.hive.llap.LlapRowRecordReader;
-import org.apache.hadoop.hive.llap.Row;
-import org.apache.hadoop.hive.llap.Schema;
/**
* Utility to test query and data retrieval via the LLAP input format.
@@ -160,10 +137,11 @@ public class LlapDump {
private static void printRow(Row row) {
Schema schema = row.getSchema();
StringBuilder sb = new StringBuilder();
- for (int idx = 0; idx < schema.getColumns().size(); ++idx) {
- if (idx > 0) {
+ int length = schema.getColumns().size();
+ for (int idx = 0; idx < length; ++idx) {
+ sb.append(row.getValue(idx));
+ if (idx != length - 1) {
sb.append(", ");
- sb.append(row.getValue(idx));
}
}
System.out.println(sb.toString());
[2/3] hive git commit: HIVE-15672 : LLAP text cache: improve first
query perf II (Sergey Shelukhin, reviewed by Prasanth Jayachandran,
Owen O'Malley)
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
index a434763..c21327f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -18,14 +18,14 @@
package org.apache.hadoop.hive.ql.io.orc.encoded;
import org.apache.orc.impl.RunLengthByteReader;
-import org.apache.orc.impl.SchemaEvolution;
-import org.apache.orc.impl.StreamName;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.orc.CompressionCodec;
import org.apache.orc.TypeDescription;
import org.apache.orc.TypeDescription.Category;
@@ -54,20 +54,40 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private SettableUncompressedStream _presentStream;
private SettableUncompressedStream _secondsStream;
private SettableUncompressedStream _nanosStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private TimestampStreamReader(int columnId, SettableUncompressedStream present,
SettableUncompressedStream data, SettableUncompressedStream nanos,
boolean isFileCompressed, OrcProto.ColumnEncoding encoding,
- TreeReaderFactory.Context context) throws IOException {
+ TreeReaderFactory.Context context,
+ List<ColumnVector> vectors) throws IOException {
super(columnId, present, data, nanos, encoding, context);
this.isFileCompressed = isFileCompressed;
this._presentStream = present;
this._secondsStream = data;
this._nanosStream = nanos;
+ this.vectors = vectors;
+ }
+
+ @Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ // Note: we assume that batchSize will be consistent with vectors passed in.
+ // This is rather brittle; same in other readers.
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
}
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (isFileCompressed) {
index.getNext();
@@ -95,6 +115,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
@Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ // The situation here and in other readers is currently as such - setBuffers is never called
+ // in SerDe reader case, and SerDe reader case is the only one that uses vector-s.
+ // When the readers are created with vectors, streams are actually not created at all.
+ // So, if we could have a set of vectors, then set of buffers, we'd be in trouble here;
+ // we may need to implement that if this scenario is ever supported.
+ assert vectors == null;
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -119,6 +145,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
private TreeReaderFactory.Context context;
+ private List<ColumnVector> vectors;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
this.columnIndex = columnIndex;
@@ -170,7 +197,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
boolean isFileCompressed = compressionCodec != null;
return new TimestampStreamReader(columnIndex, present, data, nanos,
- isFileCompressed, columnEncoding, context);
+ isFileCompressed, columnEncoding, context, vectors);
+ }
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
}
}
@@ -187,12 +219,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private SettableUncompressedStream _dataStream;
private SettableUncompressedStream _lengthStream;
private SettableUncompressedStream _dictionaryStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private StringStreamReader(int columnId, SettableUncompressedStream present,
SettableUncompressedStream data, SettableUncompressedStream length,
SettableUncompressedStream dictionary,
boolean isFileCompressed, OrcProto.ColumnEncoding encoding,
- TreeReaderFactory.Context context) throws IOException {
+ TreeReaderFactory.Context context, List<ColumnVector> vectors) throws IOException {
super(columnId, present, data, length, dictionary, encoding, context);
this._isDictionaryEncoding = dictionary != null;
this._isFileCompressed = isFileCompressed;
@@ -200,6 +234,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
this._dataStream = data;
this._lengthStream = length;
this._dictionaryStream = dictionary;
+ this.vectors = vectors;
}
@Override
@@ -210,6 +245,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (_isFileCompressed) {
index.getNext();
@@ -250,8 +286,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -284,9 +334,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
+ private List<ColumnVector> vectors;
private TreeReaderFactory.Context context;
-
public StreamReaderBuilder setColumnIndex(int columnIndex) {
this.columnIndex = columnIndex;
return this;
@@ -345,7 +395,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
boolean isFileCompressed = compressionCodec != null;
return new StringStreamReader(columnIndex, present, data, length, dictionary,
- isFileCompressed, columnEncoding, context);
+ isFileCompressed, columnEncoding, context, vectors);
+ }
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
}
}
@@ -359,19 +414,23 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private boolean isFileCompressed;
private SettableUncompressedStream _presentStream;
private SettableUncompressedStream _dataStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private ShortStreamReader(int columnId, SettableUncompressedStream present,
SettableUncompressedStream data, boolean isFileCompressed,
- OrcProto.ColumnEncoding encoding,
- TreeReaderFactory.Context context) throws IOException {
+ OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context,
+ List<ColumnVector> vectors) throws IOException {
super(columnId, present, data, encoding, context);
this.isFileCompressed = isFileCompressed;
this._presentStream = present;
this._dataStream = data;
+ this.vectors = vectors;
}
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (isFileCompressed) {
index.getNext();
@@ -390,8 +449,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -407,6 +480,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
+ private List<ColumnVector> vectors;
private TreeReaderFactory.Context context;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -450,8 +524,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
boolean isFileCompressed = compressionCodec != null;
return new ShortStreamReader(columnIndex, present, data, isFileCompressed,
- columnEncoding, context);
+ columnEncoding, context, vectors);
}
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
+ }
+
}
public static StreamReaderBuilder builder() {
@@ -463,19 +543,23 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private boolean _isFileCompressed;
private SettableUncompressedStream _presentStream;
private SettableUncompressedStream _dataStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private LongStreamReader(int columnId, SettableUncompressedStream present,
SettableUncompressedStream data, boolean isFileCompressed,
- OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context
- ) throws IOException {
+ OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context,
+ List<ColumnVector> vectors) throws IOException {
super(columnId, present, data, encoding, context);
this._isFileCompressed = isFileCompressed;
this._presentStream = present;
this._dataStream = data;
+ this.vectors = vectors;
}
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (_isFileCompressed) {
index.getNext();
@@ -494,8 +578,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -512,6 +610,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
private TreeReaderFactory.Context context;
+ private List<ColumnVector> vectors;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -555,8 +654,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
boolean isFileCompressed = compressionCodec != null;
return new LongStreamReader(columnIndex, present, data, isFileCompressed,
- columnEncoding, context);
+ columnEncoding, context, vectors);
}
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
+ }
+
}
public static StreamReaderBuilder builder() {
@@ -568,19 +673,23 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private boolean _isFileCompressed;
private SettableUncompressedStream _presentStream;
private SettableUncompressedStream _dataStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private IntStreamReader(int columnId, SettableUncompressedStream present,
SettableUncompressedStream data, boolean isFileCompressed,
- OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context
- ) throws IOException {
+ OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context,
+ List<ColumnVector> vectors) throws IOException {
super(columnId, present, data, encoding, context);
this._isFileCompressed = isFileCompressed;
this._dataStream = data;
this._presentStream = present;
+ this.vectors = vectors;
}
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (_isFileCompressed) {
index.getNext();
@@ -599,8 +708,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -616,6 +739,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
+ private List<ColumnVector> vectors;
private TreeReaderFactory.Context context;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -659,8 +783,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
boolean isFileCompressed = compressionCodec != null;
return new IntStreamReader(columnIndex, present, data, isFileCompressed,
- columnEncoding, context);
+ columnEncoding, context, vectors);
}
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
+ }
+
}
public static StreamReaderBuilder builder() {
@@ -673,17 +803,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private boolean _isFileCompressed;
private SettableUncompressedStream _presentStream;
private SettableUncompressedStream _dataStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private FloatStreamReader(int columnId, SettableUncompressedStream present,
- SettableUncompressedStream data, boolean isFileCompressed) throws IOException {
+ SettableUncompressedStream data, boolean isFileCompressed,
+ List<ColumnVector> vectors) throws IOException {
super(columnId, present, data);
this._isFileCompressed = isFileCompressed;
this._presentStream = present;
this._dataStream = data;
+ this.vectors = vectors;
}
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (_isFileCompressed) {
index.getNext();
@@ -702,8 +837,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -718,7 +867,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private ColumnStreamData presentStream;
private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
-
+ private List<ColumnVector> vectors;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
this.columnIndex = columnIndex;
@@ -750,7 +899,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
dataStream);
boolean isFileCompressed = compressionCodec != null;
- return new FloatStreamReader(columnIndex, present, data, isFileCompressed);
+ return new FloatStreamReader(columnIndex, present, data, isFileCompressed, vectors);
+ }
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
}
}
@@ -764,17 +918,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private boolean _isFileCompressed;
private SettableUncompressedStream _presentStream;
private SettableUncompressedStream _dataStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private DoubleStreamReader(int columnId, SettableUncompressedStream present,
- SettableUncompressedStream data, boolean isFileCompressed) throws IOException {
+ SettableUncompressedStream data, boolean isFileCompressed,
+ List<ColumnVector> vectors) throws IOException {
super(columnId, present, data);
this._isFileCompressed = isFileCompressed;
this._presentStream = present;
this._dataStream = data;
+ this.vectors = vectors;
}
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (_isFileCompressed) {
index.getNext();
@@ -793,8 +952,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -809,6 +982,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private ColumnStreamData presentStream;
private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
+ private List<ColumnVector> vectors;
private TreeReaderFactory.Context context;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -846,7 +1020,13 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
dataStream);
boolean isFileCompressed = compressionCodec != null;
- return new DoubleStreamReader(columnIndex, present, data, isFileCompressed);
+ // TODO: why doesn't this use context?
+ return new DoubleStreamReader(columnIndex, present, data, isFileCompressed, vectors);
+ }
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
}
}
@@ -860,22 +1040,26 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private SettableUncompressedStream _presentStream;
private SettableUncompressedStream _valueStream;
private SettableUncompressedStream _scaleStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private DecimalStreamReader(int columnId, int precision, int scale,
SettableUncompressedStream presentStream,
SettableUncompressedStream valueStream, SettableUncompressedStream scaleStream,
boolean isFileCompressed,
- OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context
- ) throws IOException {
+ OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context,
+ List<ColumnVector> vectors) throws IOException {
super(columnId, presentStream, valueStream, scaleStream, encoding, context);
this._isFileCompressed = isFileCompressed;
this._presentStream = presentStream;
this._valueStream = valueStream;
this._scaleStream = scaleStream;
+ this.vectors = vectors;
}
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (_isFileCompressed) {
index.getNext();
@@ -901,8 +1085,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -924,9 +1122,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private int precision;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
+ private List<ColumnVector> vectors;
private TreeReaderFactory.Context context;
-
public StreamReaderBuilder setColumnIndex(int columnIndex) {
this.columnIndex = columnIndex;
return this;
@@ -985,7 +1183,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
boolean isFileCompressed = compressionCodec != null;
return new DecimalStreamReader(columnIndex, precision, scale, presentInStream,
valueInStream,
- scaleInStream, isFileCompressed, columnEncoding, context);
+ scaleInStream, isFileCompressed, columnEncoding, context, vectors);
+ }
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
}
}
@@ -998,19 +1201,23 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private boolean isFileCompressed;
private SettableUncompressedStream _presentStream;
private SettableUncompressedStream _dataStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private DateStreamReader(int columnId, SettableUncompressedStream present,
SettableUncompressedStream data, boolean isFileCompressed,
- OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context
+ OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context, List<ColumnVector> vectors
) throws IOException {
super(columnId, present, data, encoding, context);
this.isFileCompressed = isFileCompressed;
this._presentStream = present;
this._dataStream = data;
+ this.vectors = vectors;
}
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (isFileCompressed) {
index.getNext();
@@ -1029,8 +1236,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1046,6 +1267,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
+ private List<ColumnVector> vectors;
private TreeReaderFactory.Context context;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -1078,6 +1300,11 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
+ }
+
public DateStreamReader build() throws IOException {
SettableUncompressedStream present = StreamUtils
.createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(),
@@ -1090,7 +1317,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
boolean isFileCompressed = compressionCodec != null;
return new DateStreamReader(columnIndex, present, data, isFileCompressed,
- columnEncoding, context);
+ columnEncoding, context, vectors);
}
}
@@ -1106,11 +1333,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private SettableUncompressedStream _dataStream;
private SettableUncompressedStream _lengthStream;
private SettableUncompressedStream _dictionaryStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private CharStreamReader(int columnId, int maxLength,
SettableUncompressedStream present, SettableUncompressedStream data,
SettableUncompressedStream length, SettableUncompressedStream dictionary,
- boolean isFileCompressed, OrcProto.ColumnEncoding encoding) throws IOException {
+ boolean isFileCompressed, OrcProto.ColumnEncoding encoding,
+ List<ColumnVector> vectors) throws IOException {
super(columnId, maxLength, present, data, length,
dictionary, encoding);
this._isDictionaryEncoding = dictionary != null;
@@ -1119,6 +1349,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
this._dataStream = data;
this._lengthStream = length;
this._dictionaryStream = dictionary;
+ this.vectors = vectors;
}
@Override
@@ -1129,6 +1360,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (_isFileCompressed) {
index.getNext();
@@ -1169,8 +1401,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1204,7 +1450,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
-
+ private List<ColumnVector> vectors;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
this.columnIndex = columnIndex;
@@ -1264,7 +1510,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
boolean isFileCompressed = compressionCodec != null;
return new CharStreamReader(columnIndex, maxLength, present, data, length,
- dictionary, isFileCompressed, columnEncoding);
+ dictionary, isFileCompressed, columnEncoding, vectors);
+ }
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
}
}
@@ -1281,11 +1532,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private SettableUncompressedStream _dataStream;
private SettableUncompressedStream _lengthStream;
private SettableUncompressedStream _dictionaryStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private VarcharStreamReader(int columnId, int maxLength,
SettableUncompressedStream present, SettableUncompressedStream data,
SettableUncompressedStream length, SettableUncompressedStream dictionary,
- boolean isFileCompressed, OrcProto.ColumnEncoding encoding) throws IOException {
+ boolean isFileCompressed, OrcProto.ColumnEncoding encoding,
+ List<ColumnVector> vectors) throws IOException {
super(columnId, maxLength, present, data, length,
dictionary, encoding);
this._isDictionaryEncoding = dictionary != null;
@@ -1294,6 +1548,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
this._dataStream = data;
this._lengthStream = length;
this._dictionaryStream = dictionary;
+ this.vectors = vectors;
}
@Override
@@ -1304,6 +1559,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (_isFileCompressed) {
index.getNext();
@@ -1344,8 +1600,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1379,7 +1649,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
-
+ private List<ColumnVector> vectors;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
this.columnIndex = columnIndex;
@@ -1439,7 +1709,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
boolean isFileCompressed = compressionCodec != null;
return new VarcharStreamReader(columnIndex, maxLength, present, data, length,
- dictionary, isFileCompressed, columnEncoding);
+ dictionary, isFileCompressed, columnEncoding, vectors);
+ }
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
}
}
@@ -1453,17 +1728,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private boolean _isFileCompressed;
private SettableUncompressedStream _presentStream;
private SettableUncompressedStream _dataStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private ByteStreamReader(int columnId, SettableUncompressedStream present,
- SettableUncompressedStream data, boolean isFileCompressed) throws IOException {
+ SettableUncompressedStream data, boolean isFileCompressed,
+ List<ColumnVector> vectors) throws IOException {
super(columnId, present, data);
this._isFileCompressed = isFileCompressed;
this._presentStream = present;
this._dataStream = data;
+ this.vectors = vectors;
}
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (_isFileCompressed) {
index.getNext();
@@ -1482,8 +1762,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1498,7 +1792,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private ColumnStreamData presentStream;
private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
-
+ private List<ColumnVector> vectors;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
this.columnIndex = columnIndex;
@@ -1530,7 +1824,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
dataStream);
boolean isFileCompressed = compressionCodec != null;
- return new ByteStreamReader(columnIndex, present, data, isFileCompressed);
+ return new ByteStreamReader(columnIndex, present, data, isFileCompressed, vectors);
+ }
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
}
}
@@ -1544,20 +1843,37 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private SettableUncompressedStream _presentStream;
private SettableUncompressedStream _dataStream;
private SettableUncompressedStream _lengthsStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private BinaryStreamReader(int columnId, SettableUncompressedStream present,
SettableUncompressedStream data, SettableUncompressedStream length,
boolean isFileCompressed,
- OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context) throws IOException {
+ OrcProto.ColumnEncoding encoding, TreeReaderFactory.Context context, List<ColumnVector> vectors) throws IOException {
super(columnId, present, data, length, encoding, context);
this._isFileCompressed = isFileCompressed;
this._presentStream = present;
this._dataStream = data;
this._lengthsStream = length;
+ this.vectors = vectors;
+ }
+
+ @Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
}
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (_isFileCompressed) {
index.getNext();
@@ -1585,6 +1901,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
@Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1604,6 +1921,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
+ private List<ColumnVector> vectors;
private TreeReaderFactory.Context context;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
@@ -1653,7 +1971,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
boolean isFileCompressed = compressionCodec != null;
return new BinaryStreamReader(columnIndex, present, data, length, isFileCompressed,
- columnEncoding, context);
+ columnEncoding, context, vectors);
+ }
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
}
}
@@ -1666,17 +1989,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private boolean _isFileCompressed;
private SettableUncompressedStream _presentStream;
private SettableUncompressedStream _dataStream;
+ private List<ColumnVector> vectors;
+ private int vectorIndex = 0;
private BooleanStreamReader(int columnId, SettableUncompressedStream present,
- SettableUncompressedStream data, boolean isFileCompressed) throws IOException {
+ SettableUncompressedStream data, boolean isFileCompressed,
+ List<ColumnVector> vectors) throws IOException {
super(columnId, present, data);
this._isFileCompressed = isFileCompressed;
this._presentStream = present;
this._dataStream = data;
+ this.vectors = vectors;
}
@Override
public void seek(PositionProvider index) throws IOException {
+ if (vectors != null) return;
if (present != null) {
if (_isFileCompressed) {
index.getNext();
@@ -1695,8 +2023,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
+ public void nextVector(
+ ColumnVector previousVector, boolean[] isNull, int batchSize) throws IOException {
+ if (vectors == null) {
+ super.nextVector(previousVector, isNull, batchSize);
+ return;
+ }
+ vectors.get(vectorIndex++).shallowCopyTo(previousVector);
+ if (vectorIndex == vectors.size()) {
+ vectors = null;
+ }
+ }
+
+ @Override
public void setBuffers(EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe)
throws IOException {
+ assert vectors == null; // See the comment in TimestampStreamReader.setBuffers.
ColumnStreamData[] streamsData = batch.getColumnData(columnId);
if (_presentStream != null) {
_presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE]));
@@ -1711,7 +2053,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private ColumnStreamData presentStream;
private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
-
+ private List<ColumnVector> vectors;
public StreamReaderBuilder setColumnIndex(int columnIndex) {
this.columnIndex = columnIndex;
@@ -1743,7 +2085,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
dataStream);
boolean isFileCompressed = compressionCodec != null;
- return new BooleanStreamReader(columnIndex, present, data, isFileCompressed);
+ return new BooleanStreamReader(columnIndex, present, data, isFileCompressed, vectors);
+ }
+
+ public StreamReaderBuilder setVectors(List<ColumnVector> vectors) {
+ this.vectors = vectors;
+ return this;
}
}
@@ -1753,7 +2100,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
public static StructTreeReader createRootTreeReader(TypeDescription schema,
- List<OrcProto.ColumnEncoding> encodings, EncodedColumnBatch<OrcBatchKey> batch,
+ List<OrcProto.ColumnEncoding> encodings, OrcEncodedColumnBatch batch,
CompressionCodec codec, TreeReaderFactory.Context context, int[] columnMapping)
throws IOException {
if (schema.getCategory() != Category.STRUCT) {
@@ -1763,7 +2110,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
List<TypeDescription> children = schema.getChildren();
int childCount = children.size(), includedCount = 0;
for (int childIx = 0; childIx < childCount; ++childIx) {
- if (!batch.hasData(children.get(childIx).getId())) {
+ int batchColIx = children.get(childIx).getId();
+ if (!batch.hasData(batchColIx) && !batch.hasVectors(batchColIx)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Column at " + childIx + " " + children.get(childIx).getId()
+ ":" + children.get(childIx).toString() + " has no data");
@@ -1774,7 +2122,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
TreeReader[] childReaders = new TreeReader[includedCount];
for (int schemaChildIx = 0, inclChildIx = -1; schemaChildIx < childCount; ++schemaChildIx) {
- if (!batch.hasData(children.get(schemaChildIx).getId())) continue;
+ int batchColIx = children.get(schemaChildIx).getId();
+ if (!batch.hasData(batchColIx) && !batch.hasVectors(batchColIx)) continue;
childReaders[++inclChildIx] = createEncodedTreeReader(
schema.getChildren().get(schemaChildIx), encodings, batch, codec, context);
columnMapping[inclChildIx] = schemaChildIx;
@@ -1790,10 +2139,18 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private static TreeReader createEncodedTreeReader(TypeDescription schema,
- List<OrcProto.ColumnEncoding> encodings, EncodedColumnBatch<OrcBatchKey> batch,
+ List<OrcProto.ColumnEncoding> encodings, OrcEncodedColumnBatch batch,
CompressionCodec codec, TreeReaderFactory.Context context) throws IOException {
- int columnIndex = schema.getId();
- ColumnStreamData[] streamBuffers = batch.getColumnData(columnIndex);
+ int columnIndex = schema.getId();
+ ColumnStreamData[] streamBuffers = null;
+ List<ColumnVector> vectors = null;
+ if (batch.hasData(columnIndex)) {
+ streamBuffers = batch.getColumnData(columnIndex);
+ } else if (batch.hasVectors(columnIndex)) {
+ vectors = batch.getColumnVectors(columnIndex);
+ } else {
+ throw new AssertionError("Batch has no data for " + columnIndex + ": " + batch);
+ }
// EncodedColumnBatch is already decompressed, we don't really need to pass codec.
// But we need to know if the original data is compressed or not. This is used to skip
@@ -1804,20 +2161,26 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex);
// stream buffers are arranged in enum order of stream kind
- ColumnStreamData present = streamBuffers[OrcProto.Stream.Kind.PRESENT_VALUE],
- data = streamBuffers[OrcProto.Stream.Kind.DATA_VALUE],
- dictionary = streamBuffers[OrcProto.Stream.Kind.DICTIONARY_DATA_VALUE],
- lengths = streamBuffers[OrcProto.Stream.Kind.LENGTH_VALUE],
+ ColumnStreamData present = null, data = null, dictionary = null,
+ lengths = null, secondary = null;
+ if (streamBuffers != null) {
+ present = streamBuffers[OrcProto.Stream.Kind.PRESENT_VALUE];
+ data = streamBuffers[OrcProto.Stream.Kind.DATA_VALUE];
+ dictionary = streamBuffers[OrcProto.Stream.Kind.DICTIONARY_DATA_VALUE];
+ lengths = streamBuffers[OrcProto.Stream.Kind.LENGTH_VALUE];
secondary = streamBuffers[OrcProto.Stream.Kind.SECONDARY_VALUE];
+ }
if (LOG.isDebugEnabled()) {
- LOG.debug("columnIndex: {} columnType: {} streamBuffers.length: {} columnEncoding: {}" +
+ LOG.debug("columnIndex: {} columnType: {} streamBuffers.length: {} vectors: {} columnEncoding: {}" +
" present: {} data: {} dictionary: {} lengths: {} secondary: {} tz: {}",
- columnIndex, schema, streamBuffers.length, columnEncoding, present != null,
+ columnIndex, schema, streamBuffers == null ? 0 : streamBuffers.length,
+ vectors == null ? 0 : vectors.size(), columnEncoding, present != null,
data, dictionary != null, lengths != null, secondary != null,
context.getWriterTimezone());
}
+ // TODO: get rid of the builders - they serve no purpose... just call ctors directly.
switch (schema.getCategory()) {
case BINARY:
case BOOLEAN:
@@ -1833,9 +2196,10 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
case DECIMAL:
case TIMESTAMP:
case DATE:
- return getPrimitiveTreeReaders(columnIndex, schema, codec, columnEncoding,
- present, data, dictionary, lengths, secondary, context);
+ return getPrimitiveTreeReader(columnIndex, schema, codec, columnEncoding,
+ present, data, dictionary, lengths, secondary, context, vectors);
case LIST:
+ assert vectors == null; // Not currently supported.
TypeDescription elementType = schema.getChildren().get(0);
TreeReader elementReader = createEncodedTreeReader(
elementType, encodings, batch, codec, context);
@@ -1849,6 +2213,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setContext(context)
.build();
case MAP:
+ assert vectors == null; // Not currently supported.
TypeDescription keyType = schema.getChildren().get(0);
TypeDescription valueType = schema.getChildren().get(1);
TreeReader keyReader = createEncodedTreeReader(
@@ -1866,6 +2231,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setContext(context)
.build();
case STRUCT: {
+ assert vectors == null; // Not currently supported.
int childCount = schema.getChildren().size();
TreeReader[] childReaders = new TreeReader[childCount];
for (int i = 0; i < childCount; i++) {
@@ -1883,6 +2249,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.build();
}
case UNION: {
+ assert vectors == null; // Not currently supported.
int childCount = schema.getChildren().size();
TreeReader[] childReaders = new TreeReader[childCount];
for (int i = 0; i < childCount; i++) {
@@ -1905,11 +2272,11 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
}
- private static TreeReader getPrimitiveTreeReaders(final int columnIndex,
+ private static TreeReader getPrimitiveTreeReader(final int columnIndex,
TypeDescription columnType, CompressionCodec codec, OrcProto.ColumnEncoding columnEncoding,
ColumnStreamData present, ColumnStreamData data, ColumnStreamData dictionary,
- ColumnStreamData lengths, ColumnStreamData secondary, TreeReaderFactory.Context context)
- throws IOException {
+ ColumnStreamData lengths, ColumnStreamData secondary, TreeReaderFactory.Context context,
+ List<ColumnVector> vectors) throws IOException {
switch (columnType.getCategory()) {
case BINARY:
return BinaryStreamReader.builder()
@@ -1919,6 +2286,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setLengthStream(lengths)
.setCompressionCodec(codec)
.setColumnEncoding(columnEncoding)
+ .setVectors(vectors)
.setContext(context)
.build();
case BOOLEAN:
@@ -1927,6 +2295,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setPresentStream(present)
.setDataStream(data)
.setCompressionCodec(codec)
+ .setVectors(vectors)
.build();
case BYTE:
return ByteStreamReader.builder()
@@ -1934,6 +2303,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setPresentStream(present)
.setDataStream(data)
.setCompressionCodec(codec)
+ .setVectors(vectors)
.build();
case SHORT:
return ShortStreamReader.builder()
@@ -1942,6 +2312,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setDataStream(data)
.setCompressionCodec(codec)
.setColumnEncoding(columnEncoding)
+ .setVectors(vectors)
.setContext(context)
.build();
case INT:
@@ -1951,6 +2322,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setDataStream(data)
.setCompressionCodec(codec)
.setColumnEncoding(columnEncoding)
+ .setVectors(vectors)
.setContext(context)
.build();
case LONG:
@@ -1960,6 +2332,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setDataStream(data)
.setCompressionCodec(codec)
.setColumnEncoding(columnEncoding)
+ .setVectors(vectors)
.setContext(context)
.build();
case FLOAT:
@@ -1968,6 +2341,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setPresentStream(present)
.setDataStream(data)
.setCompressionCodec(codec)
+ .setVectors(vectors)
.build();
case DOUBLE:
return DoubleStreamReader.builder()
@@ -1975,6 +2349,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setPresentStream(present)
.setDataStream(data)
.setCompressionCodec(codec)
+ .setVectors(vectors)
.build();
case CHAR:
return CharStreamReader.builder()
@@ -1986,6 +2361,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setDictionaryStream(dictionary)
.setCompressionCodec(codec)
.setColumnEncoding(columnEncoding)
+ .setVectors(vectors)
.build();
case VARCHAR:
return VarcharStreamReader.builder()
@@ -1997,6 +2373,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setDictionaryStream(dictionary)
.setCompressionCodec(codec)
.setColumnEncoding(columnEncoding)
+ .setVectors(vectors)
.build();
case STRING:
return StringStreamReader.builder()
@@ -2007,6 +2384,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setDictionaryStream(dictionary)
.setCompressionCodec(codec)
.setColumnEncoding(columnEncoding)
+ .setVectors(vectors)
.build();
case DECIMAL:
return DecimalStreamReader.builder()
@@ -2018,6 +2396,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setScaleStream(secondary)
.setCompressionCodec(codec)
.setColumnEncoding(columnEncoding)
+ .setVectors(vectors)
.setContext(context)
.build();
case TIMESTAMP:
@@ -2028,6 +2407,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setNanosStream(secondary)
.setCompressionCodec(codec)
.setColumnEncoding(columnEncoding)
+ .setVectors(vectors)
.setContext(context)
.build();
case DATE:
@@ -2037,6 +2417,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
.setDataStream(data)
.setCompressionCodec(codec)
.setColumnEncoding(columnEncoding)
+ .setVectors(vectors)
.setContext(context)
.build();
default:
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index 1c5f0e6..31b0609 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -19,12 +19,15 @@
package org.apache.hadoop.hive.ql.io.orc.encoded;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.orc.DataReader;
import org.apache.orc.OrcProto;
@@ -33,6 +36,17 @@ import org.apache.orc.OrcProto;
*/
public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
+ /**
+ * Creates the encoded reader.
+ * @param fileKey File ID to read, to use for cache lookups and such.
+ * @param dataCache Data cache to use for cache lookups.
+ * @param dataReader Data reader to read data not found in cache (from disk, HDFS, and such).
+ * @param pf Pool factory to create object pools.
+ * @return The reader.
+ */
+ EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
+ PoolFactory pf) throws IOException;
+
/** The factory that can create (or return) the pools used by encoded reader. */
public interface PoolFactory {
<T> Pool<T> createPool(int size, PoolObjectHelper<T> helper);
@@ -61,16 +75,49 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
public void initOrcColumn(int colIx) {
super.initColumn(colIx, MAX_DATA_STREAMS);
}
- }
- /**
- * Creates the encoded reader.
- * @param fileKey File ID to read, to use for cache lookups and such.
- * @param dataCache Data cache to use for cache lookups.
- * @param dataReader Data reader to read data not found in cache (from disk, HDFS, and such).
- * @param pf Pool factory to create object pools.
- * @return The reader.
- */
- EncodedReader encodedReader(
- Object fileKey, DataCache dataCache, DataReader dataReader, PoolFactory pf) throws IOException;
+ /**
+ * Same as columnData, but for the data that already comes as VRBs.
+ * The combination of the two contains all the necessary data,
+ */
+ protected List<ColumnVector>[] columnVectors;
+
+ @Override
+ public void reset() {
+ super.reset();
+ if (columnVectors == null) return;
+ Arrays.fill(columnVectors, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void initColumnWithVectors(int colIx, List<ColumnVector> data) {
+ if (columnVectors == null) {
+ columnVectors = new List[columnData.length];
+ }
+ columnVectors[colIx] = data;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void resetColumnArrays(int columnCount) {
+ super.resetColumnArrays(columnCount);
+ if (columnVectors != null && columnCount == columnVectors.length) {
+ Arrays.fill(columnVectors, null);
+ return;
+ } if (columnVectors != null) {
+ columnVectors = new List[columnCount];
+ } else {
+ columnVectors = null;
+ }
+ }
+
+ public boolean hasVectors(int colIx) {
+ return columnVectors != null && columnVectors[colIx] != null;
+ }
+
+ public List<ColumnVector> getColumnVectors(int colIx) {
+ if (!hasVectors(colIx)) throw new AssertionError("No data for column " + colIx);
+ return columnVectors[colIx];
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
index b894c11e..63084b9 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java
@@ -113,6 +113,7 @@ public class EncodedColumnBatch<BatchKey> {
}
}
+
public void initColumn(int colIx, int streamCount) {
hasData[colIx] = true;
if (columnData[colIx] == null || columnData[colIx].length != streamCount) {
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index bbd9ca6..f914a22 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@ -472,4 +472,15 @@ public class BytesColumnVector extends ColumnVector {
}
}
}
+
+ @Override
+ public void shallowCopyTo(ColumnVector otherCv) {
+ BytesColumnVector other = (BytesColumnVector)otherCv;
+ super.shallowCopyTo(other);
+ other.nextFree = nextFree;
+ other.vector = vector;
+ other.start = start;
+ other.length = length;
+ other.buffer = buffer;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
index 6f090a1..065c1fa 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java
@@ -214,4 +214,16 @@ public abstract class ColumnVector {
*/
public abstract void stringifyValue(StringBuilder buffer,
int row);
+
+ /**
+ * Shallow copy of the contents of this vector to the other vector;
+ * replaces other vector's values.
+ */
+ public void shallowCopyTo(ColumnVector otherCv) {
+ otherCv.isNull = isNull;
+ otherCv.noNulls = noNulls;
+ otherCv.isRepeating = isRepeating;
+ otherCv.preFlattenIsRepeating = preFlattenIsRepeating;
+ otherCv.preFlattenNoNulls = preFlattenNoNulls;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
index e4f8d82..67076eb 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java
@@ -18,11 +18,9 @@
package org.apache.hadoop.hive.ql.exec.vector;
-import java.math.BigInteger;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.common.type.FastHiveDecimal;
public class DecimalColumnVector extends ColumnVector {
@@ -142,4 +140,13 @@ public class DecimalColumnVector extends ColumnVector {
vector[i] = new HiveDecimalWritable(0); // Initially zero.
}
}
+
+ @Override
+ public void shallowCopyTo(ColumnVector otherCv) {
+ DecimalColumnVector other = (DecimalColumnVector)otherCv;
+ super.shallowCopyTo(other);
+ other.scale = scale;
+ other.precision = precision;
+ other.vector = vector;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
index bd421f4..11409bd 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.exec.vector;
-import java.io.IOException;
import java.util.Arrays;
/**
@@ -175,4 +174,11 @@ public class DoubleColumnVector extends ColumnVector {
}
}
}
+
+ @Override
+ public void shallowCopyTo(ColumnVector otherCv) {
+ DoubleColumnVector other = (DoubleColumnVector)otherCv;
+ super.shallowCopyTo(other);
+ other.vector = vector;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java
index c4a6c0f..e876c05 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java
@@ -364,4 +364,12 @@ public class IntervalDayTimeColumnVector extends ColumnVector {
}
}
}
+
+ @Override
+ public void shallowCopyTo(ColumnVector otherCv) {
+ IntervalDayTimeColumnVector other = (IntervalDayTimeColumnVector)otherCv;
+ super.shallowCopyTo(other);
+ other.totalSeconds = totalSeconds;
+ other.nanos = nanos;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
index 80d4731..3ae6a33 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.exec.vector;
-import java.io.IOException;
import java.util.Arrays;
/**
@@ -221,4 +220,11 @@ public class LongColumnVector extends ColumnVector {
}
}
}
+
+ @Override
+ public void shallowCopyTo(ColumnVector otherCv) {
+ LongColumnVector other = (LongColumnVector)otherCv;
+ super.shallowCopyTo(other);
+ other.vector = vector;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
index 1aeff83..892e8d8 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MultiValuedColumnVector.java
@@ -147,4 +147,8 @@ public abstract class MultiValuedColumnVector extends ColumnVector {
childCount = 0;
}
+ @Override
+ public void shallowCopyTo(ColumnVector otherCv) {
+ throw new UnsupportedOperationException(); // Implement in future, if needed.
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
index cf07bca..a361899 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java
@@ -129,4 +129,9 @@ public class StructColumnVector extends ColumnVector {
fields[i].setRepeating(isRepeating);
}
}
+
+ @Override
+ public void shallowCopyTo(ColumnVector otherCv) {
+ throw new UnsupportedOperationException(); // Implement if needed.
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
index 28997a0..9d579ce 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java
@@ -416,4 +416,12 @@ public class TimestampColumnVector extends ColumnVector {
}
}
}
+
+ @Override
+ public void shallowCopyTo(ColumnVector otherCv) {
+ TimestampColumnVector other = (TimestampColumnVector)otherCv;
+ super.shallowCopyTo(other);
+ other.time = time;
+ other.nanos = nanos;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
index 0c61243..151f791 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java
@@ -137,4 +137,9 @@ public class UnionColumnVector extends ColumnVector {
fields[i].setRepeating(isRepeating);
}
}
+
+ @Override
+ public void shallowCopyTo(ColumnVector otherCv) {
+ throw new UnsupportedOperationException(); // Implement if needed.
+ }
}
[3/3] hive git commit: HIVE-15672 : LLAP text cache: improve first
query perf II (Sergey Shelukhin, reviewed by Prasanth Jayachandran,
Owen O'Malley)
Posted by se...@apache.org.
HIVE-15672 : LLAP text cache: improve first query perf II (Sergey Shelukhin, reviewed by Prasanth Jayachandran, Owen O'Malley)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8f273cc5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8f273cc5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8f273cc5
Branch: refs/heads/master
Commit: 8f273cc53f62c79f8ac30453e3ff94717d91b4dd
Parents: fbe9b05
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Feb 9 18:26:22 2017 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Feb 9 18:26:22 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../hive/llap/cache/SerDeLowLevelCacheImpl.java | 13 +-
.../llap/io/decode/OrcEncodedDataConsumer.java | 22 +-
.../llap/io/encoded/SerDeEncodedDataReader.java | 883 +++++++++++++------
.../io/encoded/VertorDeserializeOrcWriter.java | 285 +++++-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 39 +
.../orc/encoded/EncodedTreeReaderFactory.java | 497 +++++++++--
.../hadoop/hive/ql/io/orc/encoded/Reader.java | 69 +-
.../common/io/encoded/EncodedColumnBatch.java | 1 +
.../hive/ql/exec/vector/BytesColumnVector.java | 11 +
.../hive/ql/exec/vector/ColumnVector.java | 12 +
.../ql/exec/vector/DecimalColumnVector.java | 11 +-
.../hive/ql/exec/vector/DoubleColumnVector.java | 8 +-
.../vector/IntervalDayTimeColumnVector.java | 8 +
.../hive/ql/exec/vector/LongColumnVector.java | 8 +-
.../ql/exec/vector/MultiValuedColumnVector.java | 4 +
.../hive/ql/exec/vector/StructColumnVector.java | 5 +
.../ql/exec/vector/TimestampColumnVector.java | 8 +
.../hive/ql/exec/vector/UnionColumnVector.java | 5 +
19 files changed, 1504 insertions(+), 388 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e82758f..b27b663 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2922,6 +2922,9 @@ public class HiveConf extends Configuration {
LLAP_ALLOCATOR_MAX_ALLOC + "."),
LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED("hive.llap.io.encode.vector.serde.enabled", true,
"Whether LLAP should use vectorized SerDe reader to read text data when re-encoding."),
+ LLAP_IO_ENCODE_VECTOR_SERDE_ASYNC_ENABLED("hive.llap.io.encode.vector.serde.async.enabled",
+ true,
+ "Whether LLAP should use async mode in vectorized SerDe reader to read text data."),
LLAP_IO_ENCODE_SLICE_ROW_COUNT("hive.llap.io.encode.slice.row.count", 100000,
"Row count to use to separate cache slices when reading encoded data from row-based\n" +
"inputs into LLAP cache, if this feature is enabled."),
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
index 85fae9a..4809398 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -113,7 +113,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
public static final class StripeData {
// In LRR case, if we just store 2 boundaries (which could be split boundaries or reader
- // positions, we wouldn't be able to account for torn rows correctly because the semantics of
+ // positions), we wouldn't be able to account for torn rows correctly because the semantics of
// our "exact" reader positions, and inexact split boundaries, are different. We cannot even
// tell LRR to use exact boundaries, as there can be a mismatch in an original mid-file split
// wrt first row when caching - we may produce incorrect result if we adjust the split
@@ -182,7 +182,7 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
+ firstStart + " to [" + lastStart + ", " + lastEnd + ")";
}
- public static StripeData duplicateForResults(StripeData s) {
+ public static StripeData duplicateStructure(StripeData s) {
return new StripeData(s.knownTornStart, s.firstStart, s.lastStart, s.lastEnd,
s.rowCount, new OrcProto.ColumnEncoding[s.encodings.length]);
}
@@ -389,14 +389,14 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
LlapIoImpl.CACHE_LOGGER.trace("Got stripe in cache " + cStripe);
}
- StripeData stripe = StripeData.duplicateForResults(cStripe);
+ StripeData stripe = StripeData.duplicateStructure(cStripe);
result.stripes.add(stripe);
boolean isMissed = false;
for (int colIx = 0; colIx < cached.colCount; ++colIx) {
if (!includes[colIx]) continue;
if (cStripe.encodings[colIx] == null || cStripe.data[colIx] == null) {
if (cStripe.data[colIx] != null) {
- assert false : cStripe;
+ throw new AssertionError(cStripe);
// No encoding => must have no data.
}
isMissed = true;
@@ -419,9 +419,9 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
LlapIoImpl.CACHE_LOGGER.info("Couldn't lock data for stripe at "
+ stripeIx + ", colIx " + colIx + ", stream type " + streamIx);
+ handleRemovedColumnData(cColData);
cColData = null;
isMissed = true;
- handleRemovedColumnData(cColData);
if (gotAllData != null) {
gotAllData.value = false;
}
@@ -432,6 +432,9 @@ public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugD
// At this point, we have arrived at the level where we need all the data, and the
// arrays never change. So we will just do a shallow assignment here instead of copy.
stripe.data[colIx] = cColData;
+ if (cColData == null) {
+ stripe.encodings[colIx] = null;
+ }
}
doMetricsStuffForOneSlice(qfCounters, stripe, isMissed);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 4295c1c..8d96e7b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata;
-import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -53,12 +52,8 @@ import org.apache.orc.impl.SchemaEvolution;
import org.apache.orc.impl.TreeReaderFactory;
import org.apache.orc.impl.TreeReaderFactory.StructTreeReader;
import org.apache.orc.impl.TreeReaderFactory.TreeReader;
-import org.apache.orc.OrcProto;
import org.apache.orc.impl.WriterImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
+import org.apache.orc.OrcProto;
public class OrcEncodedDataConsumer
@@ -134,7 +129,7 @@ public class OrcEncodedDataConsumer
schema, stripeMetadata.getEncodings(), batch, codec, context, columnMapping);
this.columnReaders = treeReader.getChildReaders();
this.columnMapping = Arrays.copyOf(columnMapping, columnReaders.length);
- positionInStreams(columnReaders, batch, stripeMetadata);
+ positionInStreams(columnReaders, batch.getBatchKey(), stripeMetadata);
} else {
repositionInStreams(this.columnReaders, batch, sameStripe, stripeMetadata);
}
@@ -225,8 +220,8 @@ public class OrcEncodedDataConsumer
}
private void positionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
- EncodedColumnBatch<OrcBatchKey> batch, ConsumerStripeMetadata stripeMetadata) throws IOException {
- PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+ OrcBatchKey batchKey, ConsumerStripeMetadata stripeMetadata) throws IOException {
+ PositionProvider[] pps = createPositionProviders(columnReaders, batchKey, stripeMetadata);
if (pps == null) return;
for (int i = 0; i < columnReaders.length; i++) {
columnReaders[i].seek(pps);
@@ -236,10 +231,13 @@ public class OrcEncodedDataConsumer
private void repositionInStreams(TreeReaderFactory.TreeReader[] columnReaders,
EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe,
ConsumerStripeMetadata stripeMetadata) throws IOException {
- PositionProvider[] pps = createPositionProviders(columnReaders, batch, stripeMetadata);
+ PositionProvider[] pps = createPositionProviders(
+ columnReaders, batch.getBatchKey(), stripeMetadata);
if (pps == null) return;
for (int i = 0; i < columnReaders.length; i++) {
TreeReader reader = columnReaders[i];
+ // Note: we assume this never happens for SerDe reader - the batch would never have vectors.
+ // That is always true now; but it wasn't some day, the below would throw in getColumnData.
((SettableTreeReader) reader).setBuffers(batch, sameStripe);
// TODO: When hive moves to java8, make updateTimezone() as default method in
// SettableTreeReader so that we can avoid this check.
@@ -268,7 +266,7 @@ public class OrcEncodedDataConsumer
}
private PositionProvider[] createPositionProviders(
- TreeReaderFactory.TreeReader[] columnReaders, EncodedColumnBatch<OrcBatchKey> batch,
+ TreeReaderFactory.TreeReader[] columnReaders, OrcBatchKey batchKey,
ConsumerStripeMetadata stripeMetadata) throws IOException {
if (columnReaders.length == 0) return null;
PositionProvider[] pps = null;
@@ -279,7 +277,7 @@ public class OrcEncodedDataConsumer
pps[i] = singleRgPp;
}
} else {
- int rowGroupIndex = batch.getBatchKey().rgIx;
+ int rowGroupIndex = batchKey.rgIx;
if (rowGroupIndex == OrcEncodedColumnBatch.ALL_RGS) {
throw new IOException("Cannot position readers without RG information");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
index 8d86d17..419043a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java
@@ -52,6 +52,10 @@ import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.SerDeStripeMetadata;
import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter;
+import org.apache.hadoop.hive.llap.io.encoded.VertorDeserializeOrcWriter.AsyncCallback;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions;
@@ -158,19 +162,21 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private final InputFormat<?, ?> sourceInputFormat;
private final Reporter reporter;
private final JobConf jobConf;
+ private final TypeDescription schema;
private final int allocSize;
private final int targetSliceRowCount;
private final boolean isLrrEnabled;
private final boolean[] writerIncludes;
- private EncodingWriter writer = null;
- private CacheWriter cacheWriter = null;
+ private FileReaderYieldReturn currentFileRead = null;
+
/**
* Data from cache currently being processed. We store it here so that we could decref
* it in case of failures. We remove each slice from the data after it has been sent to
* the consumer, at which point the consumer is responsible for it.
*/
private FileData cachedData;
+ private List<VertorDeserializeOrcWriter> asyncWriters = new ArrayList<>();
public SerDeEncodedDataReader(SerDeLowLevelCacheImpl cache,
BufferUsageManager bufferManager, Configuration daemonConf, FileSplit split,
@@ -210,6 +216,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
this.sourceSerDe = sourceSerDe;
this.reporter = reporter;
this.jobConf = jobConf;
+ this.schema = schema;
this.writerIncludes = OrcInputFormat.genIncludedColumns(schema, columnIds);
SchemaEvolution evolution = new SchemaEvolution(schema,
new Reader.Options(jobConf).include(writerIncludes));
@@ -243,7 +250,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
throw new UnsupportedOperationException();
}
- // TODO: move to base class?
+ // TODO: move to a base class?
@Override
protected Void callInternal() throws IOException, InterruptedException {
return ugi.doAs(new PrivilegedExceptionAction<Void>() {
@@ -254,21 +261,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
});
}
- public static class CacheOutStream extends OutStream {
- private final CacheOutputReceiver receiver;
- public CacheOutStream(String name, int bufferSize, CompressionCodec codec,
- CacheOutputReceiver receiver) throws IOException {
- super(name, bufferSize, codec, receiver);
- this.receiver = receiver;
- }
-
- @Override
- public void clear() throws IOException {
- super.clear();
- receiver.clear();
- }
- }
-
/** A row-based (Writable) reader that may also be able to report file offsets. */
interface ReaderWithOffsets {
/** Moves the reader to the next row. */
@@ -333,7 +325,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private CacheStripeData currentStripe;
private final List<CacheStripeData> stripes = new ArrayList<>();
private final BufferUsageManager bufferManager;
- private final int bufferSize;
+ /**
+ * For !doesSourceHaveIncludes case, stores global column IDs to verify writer columns.
+ * For doesSourceHaveIncludes case, stores source column IDs used to map things.
+ */
private final List<Integer> columnIds;
private final boolean[] writerIncludes;
// These are global since ORC reuses objects between stripes.
@@ -341,14 +336,13 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private final Map<Integer, List<CacheOutputReceiver>> colStreams = new HashMap<>();
private final boolean doesSourceHaveIncludes;
- public CacheWriter(BufferUsageManager bufferManager, int bufferSize,
- List<Integer> columnIds, boolean[] writerIncludes, boolean doesSourceHaveIncludes) {
+ public CacheWriter(BufferUsageManager bufferManager, List<Integer> columnIds,
+ boolean[] writerIncludes, boolean doesSourceHaveIncludes) {
this.bufferManager = bufferManager;
- this.bufferSize = bufferSize;
- this.columnIds = columnIds;
assert writerIncludes != null; // Taken care of on higher level.
this.writerIncludes = writerIncludes;
this.doesSourceHaveIncludes = doesSourceHaveIncludes;
+ this.columnIds = columnIds;
startStripe();
}
@@ -456,18 +450,18 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
@Override
public void writeHeader() throws IOException {
-
}
@Override
- public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, CompressionCodec codec) throws IOException {
+ public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index,
+ CompressionCodec codec) throws IOException {
// TODO: right now we treat each slice as a stripe with a single RG and never bother
// with indexes. In phase 4, we need to add indexing and filtering.
}
@Override
- public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom, CompressionCodec codec) throws IOException {
-
+ public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom,
+ CompressionCodec codec) throws IOException {
}
@Override
@@ -545,9 +539,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
@Override
- public void appendRawStripe(ByteBuffer stripe,
- OrcProto.StripeInformation.Builder dirEntry
- ) throws IOException {
+ public void appendRawStripe(
+ ByteBuffer stripe, OrcProto.StripeInformation.Builder dirEntry) throws IOException {
throw new UnsupportedOperationException(); // Only used in ACID writer.
}
@@ -586,7 +579,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
@Override
public void suppress() {
suppressed = true;
- buffers = null;
lastBufferPos = -1;
}
@@ -639,6 +631,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
private static class NullOutputReceiver implements OutputReceiver {
+ @SuppressWarnings("unused")
private final StreamName name;
public NullOutputReceiver(StreamName name) {
@@ -655,6 +648,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
protected Void performDataRead() throws IOException {
+ boolean isOk = false;
try {
try {
long startTime = counters.startTimeCounter();
@@ -667,10 +661,13 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
try {
isFromCache = readFileWithCache(startTime);
} finally {
+ // Note that the code removes the data from the field as it's passed to the consumer,
+ // so we expect to have stuff remaining in there only in case of errors.
if (cachedData != null && cachedData.getData() != null) {
for (StripeData sd : cachedData.getData()) {
unlockAllBuffers(sd);
}
+ cachedData = null;
}
}
if (isFromCache == null) return null; // Stop requested, and handled inside.
@@ -680,20 +677,24 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
// Done with all the things.
recordReaderTime(startTime);
+ if (LlapIoImpl.LOG.isTraceEnabled()) {
+ LlapIoImpl.LOG.trace("done processing {}", split);
+ }
} catch (Throwable e) {
LlapIoImpl.LOG.error("Exception while processing", e);
consumer.setError(e);
throw e;
}
consumer.setDone();
-
- LlapIoImpl.LOG.trace("done processing {}", split);
+ isOk = true;
return null;
} finally {
- cleanupReaders();
+ cleanup(!isOk);
+ // Do not clean up the writers - the callback should do it.
}
}
+
private void unlockAllBuffers(StripeData si) {
for (int i = 0; i < si.getData().length; ++i) {
LlapDataBuffer[][] colData = si.getData()[i];
@@ -718,6 +719,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
// Make data consistent with encodings, don't store useless information.
if (sd.getData()[i] == null) {
encodings[i] = null;
+ } else if (encodings[i] == null) {
+ throw new AssertionError("Caching data without an encoding at " + i + ": " + sd);
}
}
FileData fd = new FileData(fileKey, encodings.length);
@@ -732,7 +735,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
// This relies on sequence of calls to cacheFileData and sendEcb..
}
-
private void lockAllBuffers(StripeData sd) {
for (int i = 0; i < sd.getData().length; ++i) {
LlapDataBuffer[][] colData = sd.getData()[i];
@@ -755,7 +757,9 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
this.cachedData = cache.getFileData(fileKey, split.getStart(),
endOfSplit, writerIncludes, CC_FACTORY, counters, gotAllData);
if (cachedData == null) {
- LlapIoImpl.CACHE_LOGGER.trace("No data for the split found in cache");
+ if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.CACHE_LOGGER.trace("No data for the split found in cache");
+ }
return false;
}
String[] hosts = extractHosts(split, false), inMemoryHosts = extractHosts(split, true);
@@ -796,7 +800,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
if (uncachedSuffixStart < endOfSplit || isUnfortunate) {
- // TODO: will 0-length split work? should we assume 1+ chars and add 1 for isUnfortunate?
+ // Note: we assume 0-length split is correct given now LRR interprets offsets (reading an
+ // extra row). Should we instead assume 1+ chars and add 1 for isUnfortunate?
FileSplit splitPart = new FileSplit(split.getPath(), uncachedSuffixStart,
endOfSplit - uncachedSuffixStart, hosts, inMemoryHosts);
if (!processOneFileSplit(splitPart, startTime, stripeIx, null)) return null;
@@ -806,101 +811,119 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
public boolean processOneFileSplit(FileSplit split, long startTime,
Ref<Integer> stripeIxRef, StripeData slice) throws IOException {
- ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
LlapIoImpl.LOG.info("Processing one split {" + split.getPath() + ", "
+ split.getStart() + ", " + split.getLength() + "}");
if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
LlapIoImpl.CACHE_LOGGER.trace("Cache data for the split is " + slice);
}
- boolean[] splitIncludes = writerIncludes;
- boolean hasAllData = false;
- if (cacheEncodings != null) {
- hasAllData = true;
- splitIncludes = Arrays.copyOf(writerIncludes, writerIncludes.length);
- for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
- if (!splitIncludes[colIx]) continue;
- assert (cacheEncodings[colIx] != null) == (slice.getData()[colIx] != null);
- if (cacheEncodings[colIx] != null) {
- splitIncludes[colIx] = false;
- } else {
- hasAllData = false;
- }
- }
- }
- if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
- LlapIoImpl.LOG.trace("Includes accounting for cached data: before " + DebugUtils.toString(
- writerIncludes) + ", after " + DebugUtils.toString(splitIncludes));
- }
+ boolean[] splitIncludes = Arrays.copyOf(writerIncludes, writerIncludes.length);
+ boolean hasAllData = slice != null
+ && determineSplitIncludes(slice, splitIncludes, writerIncludes);
+
// We have 3 cases here:
// 1) All the data is in the cache. Always a single slice, no disk read, no cache puts.
// 2) Some data is in the cache. Always a single slice, disk read and a single cache put.
// 3) No data is in the cache. Multiple slices, disk read and multiple cache puts.
- if (!hasAllData) {
- // This initializes cacheWriter with data.
- readSplitFromFile(split, splitIncludes, slice);
- assert cacheWriter != null;
- }
- if (slice != null) {
- // If we had a cache range already, it should not have been split.
- assert cacheWriter == null || cacheWriter.stripes.size() == 1;
- CacheWriter.CacheStripeData csd = hasAllData ? null : cacheWriter.stripes.get(0);
+ if (hasAllData) {
+ // Everything comes from cache.
+ CacheWriter.CacheStripeData csd = null;
boolean result = processOneSlice(csd, splitIncludes, stripeIxRef.value, slice, startTime);
++stripeIxRef.value;
return result;
- } else {
- for (CacheWriter.CacheStripeData csd : cacheWriter.stripes) {
- if (!processOneSlice(csd, splitIncludes, stripeIxRef.value, null, startTime)) {
- return false;
+ }
+
+ boolean result = false;
+ // This initializes currentFileRead.
+ startReadSplitFromFile(split, splitIncludes, slice);
+ try {
+ if (slice != null) {
+ // If we had a cache range already, we expect a single matching disk slice.
+ Vectors vectors = currentFileRead.readNextSlice();
+ if (!vectors.isSupported()) {
+ // Not in VRB mode - the new cache data is ready, we should use it.
+ CacheWriter cacheWriter = currentFileRead.getCacheWriter();
+ assert cacheWriter.stripes.size() == 1;
+ result = processOneSlice(
+ cacheWriter.stripes.get(0), splitIncludes, stripeIxRef.value, slice, startTime);
+ } else {
+ // VRB mode - process the VRBs with cache data; the new cache data is coming later.
+ result = processOneSlice(
+ vectors, splitIncludes, stripeIxRef.value, slice, startTime);
}
+ assert null == currentFileRead.readNextSlice();
++stripeIxRef.value;
+ } else {
+ // All the data comes from disk. The reader may have split it into multiple slices.
+ Vectors vectors = currentFileRead.readNextSlice();
+ assert vectors != null;
+ result = true;
+ if (!vectors.isSupported()) {
+ // Not in VRB mode - the new cache data is (partially) ready, we should use it.
+ while (currentFileRead.readNextSlice() != null); // Force the rest of the data thru.
+ CacheWriter cacheWriter = currentFileRead.getCacheWriter();
+ for (CacheWriter.CacheStripeData csd : cacheWriter.stripes) {
+ if (!processOneSlice(csd, splitIncludes, stripeIxRef.value, null, startTime)) {
+ result = false;
+ break;
+ }
+ ++stripeIxRef.value;
+ }
+ } else {
+ // VRB mode - process the VRBs with cache data; the new cache data is coming later.
+ do {
+ assert vectors.isSupported();
+ if (!processOneSlice(vectors, splitIncludes, stripeIxRef.value, null, startTime)) {
+ result = false;
+ break;
+ }
+ ++stripeIxRef.value;
+ } while ((vectors = currentFileRead.readNextSlice()) != null);
+ }
}
- return true;
+ } finally {
+ cleanUpCurrentRead();
}
+ return result;
}
- private boolean processOneSlice(CacheWriter.CacheStripeData csd, boolean[] splitIncludes,
- int stripeIx, StripeData slice, long startTime) throws IOException {
- String sliceStr = slice == null ? "null" : slice.toCoordinateString();
- if (LlapIoImpl.LOG.isDebugEnabled()) {
- LlapIoImpl.LOG.debug("Processing slice #" + stripeIx + " " + sliceStr + "; has"
- + ((slice == null) ? " no" : "") + " cache data; has" + ((csd == null) ? " no" : "")
- + " disk data");
+ private static boolean determineSplitIncludes(
+ StripeData slice, boolean[] splitIncludes, boolean[] writerIncludes) {
+ ColumnEncoding[] cacheEncodings = slice.getEncodings();
+ assert cacheEncodings != null;
+ boolean hasAllData = true;
+ for (int colIx = 0; colIx < cacheEncodings.length; ++colIx) {
+ if (!splitIncludes[colIx]) continue;
+ if ((cacheEncodings[colIx] != null) != (slice.getData()[colIx] != null)) {
+ throw new AssertionError("Inconsistent cache slice " + slice);
+ }
+ if (cacheEncodings[colIx] != null) {
+ splitIncludes[colIx] = false;
+ } else {
+ hasAllData = false;
+ }
+ }
+ if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOG.trace("Includes accounting for cached data: before " + DebugUtils.toString(
+ writerIncludes) + ", after " + DebugUtils.toString(splitIncludes));
}
+ return hasAllData;
+ }
- ColumnEncoding[] cacheEncodings = slice == null ? null : slice.getEncodings();
- LlapDataBuffer[][][] cacheData = slice == null ? null : slice.getData();
- long cacheRowCount = slice == null ? -1L : slice.getRowCount();
+ private boolean processOneSlice(CacheWriter.CacheStripeData diskData, boolean[] splitIncludes,
+ int stripeIx, StripeData cacheData, long startTime) throws IOException {
+ logProcessOneSlice(stripeIx, diskData, cacheData);
+
+ ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
+ LlapDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
+ long cacheRowCount = cacheData == null ? -1L : cacheData.getRowCount();
SerDeStripeMetadata metadata = new SerDeStripeMetadata(stripeIx);
StripeData sliceToCache = null;
- boolean hasAllData = csd == null;
+ boolean hasAllData = diskData == null;
if (!hasAllData) {
- if (slice == null) {
- sliceToCache = new StripeData(
- csd.knownTornStart, csd.firstRowStart, csd.lastRowStart, csd.lastRowEnd,
- csd.rowCount, csd.encodings.toArray(new ColumnEncoding[csd.encodings.size()]));
- } else {
- if (csd.rowCount != slice.getRowCount()) {
- throw new IOException("Row count mismatch; disk " + csd.rowCount + ", cache "
- + slice.getRowCount() + " from " + csd + " and " + slice);
- }
- if (csd.encodings.size() != slice.getEncodings().length) {
- throw new IOException("Column count mismatch; disk " + csd.encodings.size()
- + ", cache " + slice.getEncodings().length + " from " + csd + " and " + slice);
- }
- if (LlapIoImpl.LOG.isDebugEnabled()) {
- LlapIoImpl.LOG.debug("Creating slice to cache in addition to an existing slice "
- + slice.toCoordinateString() + "; disk offsets were " + csd.toCoordinateString());
- }
- sliceToCache = StripeData.duplicateForResults(slice);
- for (int i = 0; i < csd.encodings.size(); ++i) {
- sliceToCache.getEncodings()[i] = csd.encodings.get(i);
- }
- sliceToCache.setKnownTornStart(Math.min(csd.knownTornStart, slice.getKnownTornStart()));
- }
- metadata.setEncodings(combineCacheAndWriterEncodings(cacheEncodings, csd.encodings));
- metadata.setRowCount(csd.rowCount);
+ sliceToCache = createSliceToCache(diskData, cacheData);
+ metadata.setEncodings(combineCacheAndWriterEncodings(cacheEncodings, diskData.encodings));
+ metadata.setRowCount(diskData.rowCount);
} else {
- assert cacheWriter == null;
metadata.setEncodings(Lists.newArrayList(cacheEncodings));
metadata.setRowCount(cacheRowCount);
}
@@ -916,46 +939,28 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
if (!hasAllData && splitIncludes[colIx]) {
// The column has been read from disk.
- List<CacheWriter.CacheStreamData> streams = csd.colStreams.get(colIx);
- if (LlapIoImpl.LOG.isTraceEnabled()) {
- LlapIoImpl.LOG.trace("Processing streams for column " + colIx + ": " + streams);
- }
- LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
- = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+ List<CacheWriter.CacheStreamData> streams = diskData.colStreams.get(colIx);
+ LlapDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
if (streams == null) continue; // Struct column, such as root?
Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
while (iter.hasNext()) {
CacheWriter.CacheStreamData stream = iter.next();
if (stream.isSuppressed) {
- LlapIoImpl.LOG.trace("Removing a suppressed stream " + stream.name);
+ if (LlapIoImpl.LOG.isTraceEnabled()) {
+ LlapIoImpl.LOG.trace("Removing a suppressed stream " + stream.name);
+ }
iter.remove();
discardUncachedBuffers(stream.data);
continue;
}
- // TODO: We write each slice using a separate writer, so we don't share dictionaries. Fix?
+ int streamIx = setStreamDataToCache(newCacheDataForCol, stream);
ColumnStreamData cb = CSD_POOL.take();
cb.incRef();
- int streamIx = stream.name.getKind().getNumber();
cb.setCacheBuffers(stream.data);
- // This is kinda hacky - we "know" these are LlapDataBuffer-s.
- newCacheDataForCol[streamIx] = stream.data.toArray(
- new LlapDataBuffer[stream.data.size()]);
ecb.setStreamData(colIx, streamIx, cb);
}
} else {
- // The column has been obtained from cache.
- LlapDataBuffer[][] colData = cacheData[colIx];
- if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
- LlapIoImpl.CACHE_LOGGER.trace("Processing cache data for column " + colIx + ": "
- + SerDeLowLevelCacheImpl.toString(colData));
- }
- for (int streamIx = 0; streamIx < colData.length; ++streamIx) {
- if (colData[streamIx] == null) continue;
- ColumnStreamData cb = CSD_POOL.take();
- cb.incRef();
- cb.setCacheBuffers(Lists.<MemoryBuffer>newArrayList(colData[streamIx]));
- ecb.setStreamData(colIx, streamIx, cb);
- }
+ processColumnCacheData(cacheBuffers, ecb, colIx);
}
}
if (processStop()) {
@@ -969,7 +974,183 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
LlapIoImpl.CACHE_LOGGER.trace("Data to cache from the read " + sliceToCache);
}
cacheFileData(sliceToCache);
- return sendEcbToConsumer(ecb, slice != null, csd);
+ return sendEcbToConsumer(ecb, cacheData != null, diskData);
+ }
+
+ private void validateCacheAndDisk(StripeData cacheData,
+ long rowCount, long encodingCount, Object diskDataLog) throws IOException {
+ if (rowCount != cacheData.getRowCount()) {
+ throw new IOException("Row count mismatch; disk " + rowCount + ", cache "
+ + cacheData.getRowCount() + " from " + diskDataLog + " and " + cacheData);
+ }
+ if (encodingCount > 0 && encodingCount != cacheData.getEncodings().length) {
+ throw new IOException("Column count mismatch; disk " + encodingCount + ", cache "
+ + cacheData.getEncodings().length + " from " + diskDataLog + " and " + cacheData);
+ }
+ }
+
+
+ /** Unlike the other overload of processOneSlice, doesn't cache data. */
+ private boolean processOneSlice(Vectors diskData, boolean[] splitIncludes,
+ int stripeIx, StripeData cacheData, long startTime) throws IOException {
+ if (diskData == null) {
+ throw new AssertionError(); // The other overload should have been used.
+ }
+ // LlapIoImpl.LOG.debug("diskData " + diskData);
+ logProcessOneSlice(stripeIx, diskData, cacheData);
+
+ if (cacheData == null && diskData.getRowCount() == 0) {
+ return true; // Nothing to process.
+ }
+ ColumnEncoding[] cacheEncodings = cacheData == null ? null : cacheData.getEncodings();
+ LlapDataBuffer[][][] cacheBuffers = cacheData == null ? null : cacheData.getData();
+ if (cacheData != null) {
+ // Don't validate column count - no encodings for vectors.
+ validateCacheAndDisk(cacheData, diskData.getRowCount(), -1, diskData);
+ }
+ SerDeStripeMetadata metadata = new SerDeStripeMetadata(stripeIx);
+ metadata.setEncodings(Arrays.asList(cacheEncodings == null
+ ? new ColumnEncoding[splitIncludes.length] : cacheEncodings));
+ metadata.setRowCount(diskData.getRowCount());
+ if (LlapIoImpl.LOG.isTraceEnabled()) {
+ LlapIoImpl.LOG.trace("Derived stripe metadata for this split is " + metadata);
+ }
+ consumer.setStripeMetadata(metadata);
+
+ OrcEncodedColumnBatch ecb = ECB_POOL.take();
+ ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length);
+ int vectorsIx = 0;
+ for (int colIx = 0; colIx < writerIncludes.length; ++colIx) {
+ if (!writerIncludes[colIx]) continue;
+ if (splitIncludes[colIx]) {
+ // Skip the 0-th column, since it won't have a vector after reading the text source.
+ if (colIx != 0 ) {
+ List<ColumnVector> vectors = diskData.getVectors(vectorsIx++);
+ if (LlapIoImpl.LOG.isTraceEnabled()) {
+ LlapIoImpl.LOG.trace("Processing vectors for column " + colIx + ": " + vectors);
+ }
+ ecb.initColumnWithVectors(colIx, vectors);
+ } else {
+ ecb.initColumn(0, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
+ }
+ } else {
+ ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
+ processColumnCacheData(cacheBuffers, ecb, colIx);
+ }
+ }
+ if (processStop()) {
+ recordReaderTime(startTime);
+ return false;
+ }
+ return sendEcbToConsumer(ecb, cacheData != null, null);
+ }
+
+
+ private void processAsyncCacheData(CacheWriter.CacheStripeData diskData,
+ boolean[] splitIncludes) throws IOException {
+ StripeData sliceToCache = new StripeData(diskData.knownTornStart, diskData.firstRowStart,
+ diskData.lastRowStart, diskData.lastRowEnd, diskData.rowCount,
+ diskData.encodings.toArray(new ColumnEncoding[diskData.encodings.size()]));
+ for (int colIx = 0; colIx < splitIncludes.length; ++colIx) {
+ if (!splitIncludes[colIx]) continue;
+ // The column has been read from disk.
+ List<CacheWriter.CacheStreamData> streams = diskData.colStreams.get(colIx);
+ LlapDataBuffer[][] newCacheDataForCol = createArrayToCache(sliceToCache, colIx, streams);
+ if (streams == null) continue; // Struct column, such as root?
+ Iterator<CacheWriter.CacheStreamData> iter = streams.iterator();
+ while (iter.hasNext()) {
+ CacheWriter.CacheStreamData stream = iter.next();
+ if (stream.isSuppressed) {
+ if (LlapIoImpl.LOG.isTraceEnabled()) {
+ LlapIoImpl.LOG.trace("Removing a suppressed stream " + stream.name);
+ }
+ iter.remove();
+ discardUncachedBuffers(stream.data);
+ continue;
+ }
+ setStreamDataToCache(newCacheDataForCol, stream);
+ }
+ }
+ if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.CACHE_LOGGER.trace("Data to cache from async read " + sliceToCache);
+ }
+ try {
+ cacheFileData(sliceToCache);
+ } finally {
+ unlockAllBuffers(sliceToCache);
+ }
+ }
+
+ private StripeData createSliceToCache(
+ CacheWriter.CacheStripeData diskData, StripeData cacheData) throws IOException {
+ assert diskData != null;
+ if (cacheData == null) {
+ return new StripeData(diskData.knownTornStart, diskData.firstRowStart,
+ diskData.lastRowStart, diskData.lastRowEnd, diskData.rowCount,
+ diskData.encodings.toArray(new ColumnEncoding[diskData.encodings.size()]));
+ } else {
+ long rowCount = diskData.rowCount, encodingCount = diskData.encodings.size();
+ validateCacheAndDisk(cacheData, rowCount, encodingCount, diskData);
+ if (LlapIoImpl.LOG.isDebugEnabled()) {
+ LlapIoImpl.LOG.debug("Creating slice to cache in addition to an existing slice "
+ + cacheData.toCoordinateString() + "; disk offsets were "
+ + diskData.toCoordinateString());
+ }
+ // Note: we could just do what we already do above from disk data, except for the validation
+ // that is not strictly necessary, and knownTornStart which is an optimization.
+ StripeData sliceToCache = StripeData.duplicateStructure(cacheData);
+ for (int i = 0; i < diskData.encodings.size(); ++i) {
+ sliceToCache.getEncodings()[i] = diskData.encodings.get(i);
+ }
+ sliceToCache.setKnownTornStart(Math.min(
+ diskData.knownTornStart, sliceToCache.getKnownTornStart()));
+ return sliceToCache;
+ }
+ }
+
+
+ private static LlapDataBuffer[][] createArrayToCache(
+ StripeData sliceToCache, int colIx, List<CacheWriter.CacheStreamData> streams) {
+ if (LlapIoImpl.LOG.isTraceEnabled()) {
+ LlapIoImpl.LOG.trace("Processing streams for column " + colIx + ": " + streams);
+ }
+ LlapDataBuffer[][] newCacheDataForCol = sliceToCache.getData()[colIx]
+ = new LlapDataBuffer[OrcEncodedColumnBatch.MAX_DATA_STREAMS][];
+ return newCacheDataForCol;
+ }
+
+ private static int setStreamDataToCache(
+ LlapDataBuffer[][] newCacheDataForCol, CacheWriter.CacheStreamData stream) {
+ int streamIx = stream.name.getKind().getNumber();
+ // This is kinda hacky - we "know" these are LlapDataBuffer-s.
+ newCacheDataForCol[streamIx] = stream.data.toArray(new LlapDataBuffer[stream.data.size()]);
+ return streamIx;
+ }
+
+ private void processColumnCacheData(LlapDataBuffer[][][] cacheBuffers,
+ OrcEncodedColumnBatch ecb, int colIx) {
+ // The column has been obtained from cache.
+ LlapDataBuffer[][] colData = cacheBuffers[colIx];
+ if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.CACHE_LOGGER.trace("Processing cache data for column " + colIx + ": "
+ + SerDeLowLevelCacheImpl.toString(colData));
+ }
+ for (int streamIx = 0; streamIx < colData.length; ++streamIx) {
+ if (colData[streamIx] == null) continue;
+ ColumnStreamData cb = CSD_POOL.take();
+ cb.incRef();
+ cb.setCacheBuffers(Lists.<MemoryBuffer>newArrayList(colData[streamIx]));
+ ecb.setStreamData(colIx, streamIx, cb);
+ }
+ }
+
+ private void logProcessOneSlice(int stripeIx, Object diskData, StripeData cacheData) {
+ String sliceStr = cacheData == null ? "null" : cacheData.toCoordinateString();
+ if (LlapIoImpl.LOG.isDebugEnabled()) {
+ LlapIoImpl.LOG.debug("Processing slice #" + stripeIx + " " + sliceStr + "; has"
+ + ((cacheData == null) ? " no" : "") + " cache data; has"
+ + ((diskData == null) ? " no" : "") + " disk data");
+ }
}
private void discardUncachedBuffers(List<MemoryBuffer> list) {
@@ -1003,126 +1184,309 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
return Lists.newArrayList(combinedEncodings);
}
+ private static class Vectors {
+ private final List<ColumnVector>[] data;
+ private final boolean isSupported;
+ private final long rowCount;
+
+ @SuppressWarnings("unchecked")
+ public Vectors(List<VectorizedRowBatch> vrbs) {
+ if (vrbs == null) {
+ isSupported = false;
+ data = null;
+ rowCount = 0;
+ return;
+ }
+ isSupported = true;
+ if (vrbs.isEmpty()) {
+ data = null;
+ rowCount = 0;
+ return;
+ }
+ data = new List[vrbs.get(0).numCols];
+ for (int i = 0; i < data.length; ++i) {
+ data[i] = new ArrayList<>(vrbs.size());
+ }
+ int rowCount = 0;
+ for (VectorizedRowBatch vrb : vrbs) {
+ assert !vrb.selectedInUse;
+ rowCount += vrb.size;
+ for (int i = 0; i < vrb.cols.length; ++i) {
+ data[i].add(vrb.cols[i]);
+ }
+ }
+ this.rowCount = rowCount;
+ }
+
+ public List<ColumnVector> getVectors(int ix) {
+ return data[ix];
+ }
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ public boolean isSupported() {
+ return isSupported;
+ }
+
+ @Override
+ public String toString() {
+ return "Vectors {isSupported=" + isSupported + ", rowCount=" + rowCount
+ + ", data=" + Arrays.toString(data) + "}";
+ }
+ }
+
+ /**
+ * This class only exists because Java doesn't have yield return. The original method
+ * before this change only needed yield return-s sprinkled here and there; however,
+ * Java developers are usually paid by class, so here we go.
+ */
+ private static class FileReaderYieldReturn {
+ private ReaderWithOffsets offsetReader;
+ private int rowsPerSlice = 0;
+ private long currentKnownTornStart;
+ private long lastStartOffset = Long.MIN_VALUE, firstStartOffset = Long.MIN_VALUE;
+ private boolean hasUnsplittableData = false;
+ private final EncodingWriter writer;
+ private final boolean maySplitTheSplit;
+ private final int targetSliceRowCount;
+ private final FileSplit split;
+
+ public FileReaderYieldReturn(ReaderWithOffsets offsetReader, FileSplit split, EncodingWriter writer,
+ boolean maySplitTheSplit, int targetSliceRowCount) {
+ this.offsetReader = offsetReader;
+ currentKnownTornStart = split.getStart();
+ this.writer = writer;
+ this.maySplitTheSplit = maySplitTheSplit;
+ this.targetSliceRowCount = targetSliceRowCount;
+ this.split = split;
+ }
+
+ public CacheWriter getCacheWriter() throws IOException {
+ return writer.getCacheWriter();
+ }
+
+ public Vectors readNextSlice() throws IOException {
+ if (offsetReader == null) return null;
+ try {
+ while (offsetReader.next()) {
+ hasUnsplittableData = true;
+ Writable value = offsetReader.getCurrentRow();
+ lastStartOffset = offsetReader.getCurrentRowStartOffset();
+ if (firstStartOffset == Long.MIN_VALUE) {
+ firstStartOffset = lastStartOffset;
+ }
+ writer.writeOneRow(value);
+
+ if (maySplitTheSplit && ++rowsPerSlice == targetSliceRowCount) {
+ assert offsetReader.hasOffsets();
+ writer.flushIntermediateData();
+ long fileOffset = offsetReader.getCurrentRowEndOffset();
+ // Must support offsets to be able to split.
+ if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
+ throw new AssertionError("Unable to get offsets from "
+ + offsetReader.getClass().getSimpleName());
+ }
+ writer.setCurrentStripeOffsets(
+ currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+ writer.writeIntermediateFooter();
+
+ // Split starting at row start will not read that row.
+ currentKnownTornStart = lastStartOffset;
+ // Row offsets will be determined from the reader (we could set the first from last).
+ lastStartOffset = Long.MIN_VALUE;
+ firstStartOffset = Long.MIN_VALUE;
+ rowsPerSlice = 0;
+ return new Vectors(writer.extractCurrentVrbs());
+ }
+ }
+ try {
+ Vectors result = null;
+ if (rowsPerSlice > 0 || (!maySplitTheSplit && hasUnsplittableData)) {
+ long fileOffset = -1;
+ if (!offsetReader.hasOffsets()) {
+ // The reader doesn't support offsets. We adjust offsets to match future splits.
+ // If cached split was starting at row start, that row would be skipped, so +1
+ firstStartOffset = split.getStart() + 1;
+ // Last row starting at the end of the split would be read.
+ lastStartOffset = split.getStart() + split.getLength();
+ // However, it must end after the split end, otherwise the next one would have been read.
+ fileOffset = lastStartOffset + 1;
+ if (LlapIoImpl.CACHE_LOGGER.isDebugEnabled()) {
+ LlapIoImpl.CACHE_LOGGER.debug("Cache offsets based on the split - 'first row' at "
+ + firstStartOffset + "; 'last row' at " + lastStartOffset + ", " + fileOffset);
+ }
+ } else {
+ fileOffset = offsetReader.getCurrentRowEndOffset();
+ assert firstStartOffset >= 0 && lastStartOffset >= 0 && fileOffset >= 0;
+ }
+ writer.setCurrentStripeOffsets(
+ currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+ // Close the writer to finalize the metadata.
+ writer.close();
+ result = new Vectors(writer.extractCurrentVrbs());
+ } else {
+ writer.close();
+ }
+ return result;
+ } finally {
+ closeOffsetReader();
+ }
+ } catch (Exception ex) {
+ closeOffsetReader();
+ throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex);
+ }
+ }
+
+ private void closeOffsetReader() {
+ if (offsetReader == null) return;
+ try {
+ offsetReader.close();
+ } catch (Exception ex) {
+ LlapIoImpl.LOG.error("Failed to close source reader", ex);
+ }
+ offsetReader = null;
+ }
+ }
- public void readSplitFromFile(FileSplit split, boolean[] splitIncludes, StripeData slice)
- throws IOException {
+ public void startReadSplitFromFile(
+ FileSplit split, boolean[] splitIncludes, StripeData slice) throws IOException {
boolean maySplitTheSplit = slice == null;
ReaderWithOffsets offsetReader = null;
@SuppressWarnings("rawtypes")
RecordReader sourceReader = sourceInputFormat.getRecordReader(split, jobConf, reporter);
try {
offsetReader = createOffsetReader(sourceReader);
- maySplitTheSplit = maySplitTheSplit && offsetReader.hasOffsets();
+ sourceReader = null;
+ } finally {
+ if (sourceReader != null) {
+ try {
+ sourceReader.close();
+ } catch (Exception ex) {
+ LlapIoImpl.LOG.error("Failed to close source reader", ex);
+ }
+ }
+ }
+ maySplitTheSplit = maySplitTheSplit && offsetReader.hasOffsets();
- // writer writes to orcWriter which writes to cacheWriter
- // TODO: in due course, writer will also propagate row batches if it's capable
+ try {
StructObjectInspector originalOi = (StructObjectInspector)getOiFromSerDe();
- writer = VertorDeserializeOrcWriter.create(sourceInputFormat, sourceSerDe, parts,
- daemonConf, jobConf, split.getPath(), originalOi, columnIds);
- cacheWriter = new CacheWriter(
- bufferManager, allocSize, columnIds, splitIncludes, writer.hasIncludes());
- writer.init(OrcFile.createWriter(split.getPath(),
- createOrcWriterOptions(writer.getDestinationOi())));
-
- int rowsPerSlice = 0;
- long currentKnownTornStart = split.getStart();
- long lastStartOffset = Long.MIN_VALUE, firstStartOffset = Long.MIN_VALUE;
- boolean hasData = false;
- while (offsetReader.next()) {
- hasData = true;
- Writable value = offsetReader.getCurrentRow();
- lastStartOffset = offsetReader.getCurrentRowStartOffset();
- if (firstStartOffset == Long.MIN_VALUE) {
- firstStartOffset = lastStartOffset;
- }
- writer.writeOneRow(value);
-
- if (maySplitTheSplit && ++rowsPerSlice == targetSliceRowCount) {
- assert offsetReader.hasOffsets();
- writer.flushIntermediateData();
- long fileOffset = offsetReader.getCurrentRowEndOffset();
- // Must support offsets to be able to split.
- if (firstStartOffset < 0 || lastStartOffset < 0 || fileOffset < 0) {
- throw new AssertionError("Unable to get offsets from "
- + offsetReader.getClass().getSimpleName());
- }
- cacheWriter.setCurrentStripeOffsets(
- currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
- // Split starting at row start will not read that row.
- currentKnownTornStart = lastStartOffset;
- // Row offsets will be determined from the reader (we could set the first from last).
- lastStartOffset = Long.MIN_VALUE;
- firstStartOffset = Long.MIN_VALUE;
- rowsPerSlice = 0;
- writer.writeIntermediateFooter();
- }
+ List<Integer> splitColumnIds = OrcInputFormat.genIncludedColumnsReverse(
+ schema, splitIncludes, false);
+ // fileread writes to the writer, which writes to orcWriter, which writes to cacheWriter
+ EncodingWriter writer = VertorDeserializeOrcWriter.create(
+ sourceInputFormat, sourceSerDe, parts, daemonConf, jobConf, split.getPath(), originalOi,
+ splitColumnIds, splitIncludes, allocSize);
+ // TODO: move this into ctor? EW would need to create CacheWriter then
+ List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds;
+ writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes,
+ writer.isOnlyWritingIncludedColumns()), daemonConf, split.getPath());
+ if (writer instanceof VertorDeserializeOrcWriter) {
+ VertorDeserializeOrcWriter asyncWriter = (VertorDeserializeOrcWriter)writer;
+ asyncWriter.startAsync(new AsyncCacheDataCallback());
+ this.asyncWriters.add(asyncWriter);
}
- if (rowsPerSlice > 0 || (!maySplitTheSplit && hasData)) {
- long fileOffset = -1;
- if (!offsetReader.hasOffsets()) {
- // The reader doesn't support offsets. We adjust offsets to match future splits.
- // If cached split was starting at row start, that row would be skipped, so +1
- firstStartOffset = split.getStart() + 1;
- // Last row starting at the end of the split would be read.
- lastStartOffset = split.getStart() + split.getLength();
- // However, it must end after the split end, otherwise the next one would have been read.
- fileOffset = lastStartOffset + 1;
- if (LlapIoImpl.CACHE_LOGGER.isDebugEnabled()) {
- LlapIoImpl.CACHE_LOGGER.debug("Cache offsets based on the split - 'first row' at "
- + firstStartOffset + "; 'last row' at " + lastStartOffset + ", " + fileOffset);
- }
- } else {
- fileOffset = offsetReader.getCurrentRowEndOffset();
- assert firstStartOffset >= 0 && lastStartOffset >= 0 && fileOffset >= 0;
+ currentFileRead = new FileReaderYieldReturn(
+ offsetReader, split, writer, maySplitTheSplit, targetSliceRowCount);
+ } finally {
+ // Assignment is the last thing in the try, so if it happen we assume success.
+ if (currentFileRead != null) return;
+ if (offsetReader == null) return;
+ try {
+ offsetReader.close();
+ } catch (Exception ex) {
+ LlapIoImpl.LOG.error("Failed to close source reader", ex);
+ }
+ }
+ }
+
+ private class AsyncCacheDataCallback implements AsyncCallback {
+ @Override
+ public void onComplete(VertorDeserializeOrcWriter writer) {
+ CacheWriter cacheWriter = null;
+ try {
+ cacheWriter = writer.getCacheWriter();
+ // What we were reading from disk originally.
+ boolean[] cacheIncludes = writer.getOriginalCacheIncludes();
+ Iterator<CacheWriter.CacheStripeData> iter = cacheWriter.stripes.iterator();
+ while (iter.hasNext()) {
+ processAsyncCacheData(iter.next(), cacheIncludes);
+ iter.remove();
}
- cacheWriter.setCurrentStripeOffsets(
- currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+ } catch (IOException e) {
+ LlapIoImpl.LOG.error("Failed to cache async data", e);
+ } finally {
+ cacheWriter.discardData();
}
- // Close the writer to finalize the metadata. No catch since we cannot go on if this throws.
- writer.close();
- writer = null;
- } finally {
- // We don't need the source reader anymore.
- if (offsetReader != null) {
+ }
+ }
+
+ // TODO: this interface is ugly. The two implementations are so far apart feature-wise
+ // after all the perf changes that we might was well hardcode them separately.
+ static abstract class EncodingWriter {
+ protected Writer orcWriter;
+ protected CacheWriter cacheWriter;
+ protected final StructObjectInspector sourceOi;
+ private final int allocSize;
+
+ public EncodingWriter(StructObjectInspector sourceOi, int allocSize) {
+ this.sourceOi = sourceOi;
+ this.allocSize = allocSize;
+ }
+
+
+ public void init(CacheWriter cacheWriter, Configuration conf, Path path) throws IOException {
+ this.orcWriter = createOrcWriter(cacheWriter, conf, path, sourceOi);
+ this.cacheWriter = cacheWriter;
+ }
+
+ public CacheWriter getCacheWriter() {
+ return cacheWriter;
+ }
+ public abstract boolean isOnlyWritingIncludedColumns();
+
+ public abstract void writeOneRow(Writable row) throws IOException;
+ public abstract void setCurrentStripeOffsets(long currentKnownTornStart,
+ long firstStartOffset, long lastStartOffset, long fileOffset);
+ public abstract void flushIntermediateData() throws IOException;
+ public abstract void writeIntermediateFooter() throws IOException;
+ public abstract List<VectorizedRowBatch> extractCurrentVrbs();
+ public void close() throws IOException {
+ if (orcWriter != null) {
try {
- offsetReader.close();
+ orcWriter.close();
+ orcWriter = null;
} catch (Exception ex) {
- LlapIoImpl.LOG.error("Failed to close source reader", ex);
+ LlapIoImpl.LOG.error("Failed to close ORC writer", ex);
}
- } else {
- assert sourceReader != null;
+ }
+ if (cacheWriter != null) {
try {
- sourceReader.close();
+ cacheWriter.discardData();
+ cacheWriter = null;
} catch (Exception ex) {
- LlapIoImpl.LOG.error("Failed to close source reader", ex);
+ LlapIoImpl.LOG.error("Failed to close cache writer", ex);
}
}
}
- }
- interface EncodingWriter {
- void writeOneRow(Writable row) throws IOException;
- StructObjectInspector getDestinationOi();
- void init(Writer orcWriter);
- boolean hasIncludes();
- void writeIntermediateFooter() throws IOException;
- void flushIntermediateData() throws IOException;
- void close() throws IOException;
+ protected Writer createOrcWriter(CacheWriter cacheWriter, Configuration conf,
+ Path path, StructObjectInspector oi) throws IOException {
+ // TODO: this is currently broken. We need to set memory manager to a bogus implementation
+ // to avoid problems with memory manager actually tracking the usage.
+ return OrcFile.createWriter(path, createOrcWriterOptions(
+ sourceOi, conf, cacheWriter, allocSize));
+ }
}
- static class DeserialerOrcWriter implements EncodingWriter {
- private Writer orcWriter;
+ static class DeserializerOrcWriter extends EncodingWriter {
private final Deserializer sourceSerDe;
- private final StructObjectInspector sourceOi;
- public DeserialerOrcWriter(Deserializer sourceSerDe, StructObjectInspector sourceOi) {
+ public DeserializerOrcWriter(
+ Deserializer sourceSerDe, StructObjectInspector sourceOi, int allocSize) {
+ super(sourceOi, allocSize);
this.sourceSerDe = sourceSerDe;
- this.sourceOi = sourceOi;
- }
-
- @Override
- public void init(Writer orcWriter) {
- this.orcWriter = orcWriter;
}
@Override
@@ -1152,22 +1516,30 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
@Override
- public boolean hasIncludes() {
+ public boolean isOnlyWritingIncludedColumns() {
return false; // LazySimpleSerDe doesn't support projection.
}
@Override
- public StructObjectInspector getDestinationOi() {
- return sourceOi;
+ public void setCurrentStripeOffsets(long currentKnownTornStart,
+ long firstStartOffset, long lastStartOffset, long fileOffset) {
+ cacheWriter.setCurrentStripeOffsets(
+ currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+ }
+
+ @Override
+ public List<VectorizedRowBatch> extractCurrentVrbs() {
+ return null; // Doesn't support creating VRBs.
}
}
- private WriterOptions createOrcWriterOptions(ObjectInspector sourceOi) throws IOException {
- return OrcFile.writerOptions(daemonConf).stripeSize(Long.MAX_VALUE).blockSize(Long.MAX_VALUE)
+ static WriterOptions createOrcWriterOptions(ObjectInspector sourceOi,
+ Configuration conf, CacheWriter cacheWriter, int allocSize) throws IOException {
+ return OrcFile.writerOptions(conf).stripeSize(Long.MAX_VALUE).blockSize(Long.MAX_VALUE)
.rowIndexStride(Integer.MAX_VALUE) // For now, do not limit this - one RG per split
.blockPadding(false).compress(CompressionKind.NONE).version(Version.CURRENT)
.encodingStrategy(EncodingStrategy.SPEED).bloomFilterColumns(null).inspector(sourceOi)
- .physicalWriter(cacheWriter);
+ .physicalWriter(cacheWriter).bufferSize(allocSize);
}
private ObjectInspector getOiFromSerDe() throws IOException {
@@ -1178,7 +1550,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
}
- private ReaderWithOffsets createOffsetReader(RecordReader sourceReader) {
+ private ReaderWithOffsets createOffsetReader(RecordReader<?, ?> sourceReader) {
if (LlapIoImpl.LOG.isDebugEnabled()) {
LlapIoImpl.LOG.debug("Using " + sourceReader.getClass().getSimpleName() + " to read data");
}
@@ -1206,9 +1578,9 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
}
private boolean sendEcbToConsumer(OrcEncodedColumnBatch ecb,
- boolean hasCachedSlice, CacheWriter.CacheStripeData writerData) {
+ boolean hasCachedSlice, CacheWriter.CacheStripeData diskData) {
if (ecb == null) { // This basically means stop has been called.
- cleanupReaders();
+ cleanup(true);
return false;
}
LlapIoImpl.LOG.trace("Sending a batch over to consumer");
@@ -1216,29 +1588,32 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
if (hasCachedSlice) {
cachedData.getData().remove(0); // See javadoc - no need to clean up the cache data anymore.
}
- if (writerData != null) {
- writerData.colStreams.clear();
+ if (diskData != null) {
+ diskData.colStreams.clear();
}
return true;
}
-
- private void cleanupReaders() {
- if (writer != null) {
+ private void cleanup(boolean isError) {
+ cleanUpCurrentRead();
+ if (!isError) return;
+ for (VertorDeserializeOrcWriter asyncWriter : asyncWriters) {
try {
- writer.close();
- writer = null;
+ asyncWriter.interrupt();
} catch (Exception ex) {
- LlapIoImpl.LOG.error("Failed to close ORC writer", ex);
+ LlapIoImpl.LOG.warn("Failed to interrupt an async writer", ex);
}
}
- if (cacheWriter != null) {
- try {
- cacheWriter.discardData();
- cacheWriter = null;
- } catch (Exception ex) {
- LlapIoImpl.LOG.error("Failed to close cache writer", ex);
- }
+ asyncWriters.clear();
+ }
+
+ private void cleanUpCurrentRead() {
+ if (currentFileRead == null) return;
+ try {
+ currentFileRead.closeOffsetReader();
+ currentFileRead = null;
+ } catch (Exception ex) {
+ LlapIoImpl.LOG.error("Failed to close current file reader", ex);
}
}
@@ -1249,7 +1624,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
private boolean processStop() {
if (!isStopped) return false;
LlapIoImpl.LOG.info("SerDe-based data reader is stopping");
- cleanupReaders();
+ cleanup(true);
return true;
}
@@ -1262,11 +1637,11 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void>
return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic);
}
- // TODO: move to a superclass?
@Override
public void returnData(OrcEncodedColumnBatch ecb) {
for (int colIx = 0; colIx < ecb.getTotalColCount(); ++colIx) {
if (!ecb.hasData(colIx)) continue;
+ // TODO: reuse columnvector-s on hasBatch - save the array by column? take apart each list.
ColumnStreamData[] datas = ecb.getColumnData(colIx);
for (ColumnStreamData data : datas) {
if (data == null || data.decRef() != 0) continue;
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
index 63a3be2..86d9ecc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java
@@ -20,9 +20,11 @@ package org.apache.hadoop.hive.llap.io.encoded;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -30,7 +32,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
-import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.DeserialerOrcWriter;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter;
+import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.DeserializerOrcWriter;
import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.EncodingWriter;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow;
@@ -57,73 +60,90 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
/** The class that writes rows from a text reader to an ORC writer using VectorDeserializeRow. */
-class VertorDeserializeOrcWriter implements EncodingWriter {
+class VertorDeserializeOrcWriter extends EncodingWriter implements Runnable {
+ private final VectorizedRowBatchCtx vrbCtx;
private Writer orcWriter;
private final LazySimpleDeserializeRead deserializeRead;
private final VectorDeserializeRow<?> vectorDeserializeRow;
- private final VectorizedRowBatch sourceBatch, destinationBatch;
- private final boolean hasIncludes;
private final StructObjectInspector destinationOi;
+ private final boolean usesSourceIncludes;
+ private final List<Integer> sourceIncludes;
+
+ private final boolean isAsync;
+ private final Thread orcThread;
+ private final ConcurrentLinkedQueue<WriteOperation> queue;
+ private AsyncCallback completion;
+
+ // Stored here only as async operation context.
+ private final boolean[] cacheIncludes;
+
+ private VectorizedRowBatch sourceBatch, destinationBatch;
+ private List<VectorizedRowBatch> currentBatches;
// TODO: if more writers are added, separate out an EncodingWriterFactory
public static EncodingWriter create(InputFormat<?, ?> sourceIf, Deserializer serDe,
- Map<Path, PartitionDesc> parts, Configuration daemonConf,
- Configuration jobConf, Path splitPath, StructObjectInspector sourceOi,
- List<Integer> includes) throws IOException {
+ Map<Path, PartitionDesc> parts, Configuration daemonConf, Configuration jobConf,
+ Path splitPath, StructObjectInspector sourceOi, List<Integer> sourceIncludes,
+ boolean[] cacheIncludes, int allocSize) throws IOException {
// Vector SerDe can be disabled both on client and server side.
if (!HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED)
|| !HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED)
|| !(sourceIf instanceof TextInputFormat) || !(serDe instanceof LazySimpleSerDe)) {
- return new DeserialerOrcWriter(serDe, sourceOi);
+ return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
}
Path path = splitPath.getFileSystem(daemonConf).makeQualified(splitPath);
PartitionDesc partDesc = HiveFileFormatUtils.getPartitionDescFromPathRecursively(
parts, path, null);
if (partDesc == null) {
LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: no partition desc for " + path);
- return new DeserialerOrcWriter(serDe, sourceOi);
+ return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
}
Properties tblProps = partDesc.getTableDesc().getProperties();
if ("true".equalsIgnoreCase(tblProps.getProperty(
serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST))) {
LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter due to "
+ serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST);
- return new DeserialerOrcWriter(serDe, sourceOi);
+ return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
}
for (StructField sf : sourceOi.getAllStructFieldRefs()) {
Category c = sf.getFieldObjectInspector().getCategory();
if (c != Category.PRIMITIVE) {
LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: " + c + " is not supported");
- return new DeserialerOrcWriter(serDe, sourceOi);
+ return new DeserializerOrcWriter(serDe, sourceOi, allocSize);
}
}
LlapIoImpl.LOG.info("Creating VertorDeserializeOrcWriter for " + path);
- return new VertorDeserializeOrcWriter(daemonConf, tblProps, sourceOi, includes);
+ return new VertorDeserializeOrcWriter(
+ daemonConf, tblProps, sourceOi, sourceIncludes, cacheIncludes, allocSize);
}
private VertorDeserializeOrcWriter(Configuration conf, Properties tblProps,
- StructObjectInspector sourceOi, List<Integer> columnIds) throws IOException {
+ StructObjectInspector sourceOi, List<Integer> sourceIncludes, boolean[] cacheIncludes,
+ int allocSize) throws IOException {
+ super(sourceOi, allocSize);
// See also: the usage of VectorDeserializeType, for binary. For now, we only want text.
- VectorizedRowBatchCtx vrbCtx = createVrbCtx(sourceOi);
+ this.vrbCtx = createVrbCtx(sourceOi);
+ this.sourceIncludes = sourceIncludes;
+ this.cacheIncludes = cacheIncludes;
this.sourceBatch = vrbCtx.createVectorizedRowBatch();
deserializeRead = new LazySimpleDeserializeRead(vrbCtx.getRowColumnTypeInfos(),
/* useExternalBuffer */ true, createSerdeParams(conf, tblProps));
vectorDeserializeRow = new VectorDeserializeRow<LazySimpleDeserializeRead>(deserializeRead);
int colCount = vrbCtx.getRowColumnTypeInfos().length;
boolean[] includes = null;
- this.hasIncludes = columnIds.size() < colCount;
- if (hasIncludes) {
+ this.usesSourceIncludes = sourceIncludes.size() < colCount;
+ if (usesSourceIncludes) {
// VectorDeserializeRow produces "sparse" VRB when includes are used; we need to write the
// "dense" VRB to ORC. Ideally, we'd use projection columns, but ORC writer doesn't use them.
// In any case, we would also need to build a new OI for OrcWriter config.
// This is why OrcWriter is created after this writer, by the way.
- this.destinationBatch = new VectorizedRowBatch(columnIds.size());
+ this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size());
includes = new boolean[colCount];
int inclBatchIx = 0;
- List<String> childNames = new ArrayList<>(columnIds.size());
- List<ObjectInspector> childOis = new ArrayList<>(columnIds.size());
+ List<String> childNames = new ArrayList<>(sourceIncludes.size());
+ List<ObjectInspector> childOis = new ArrayList<>(sourceIncludes.size());
List<? extends StructField> sourceFields = sourceOi.getAllStructFieldRefs();
- for (Integer columnId : columnIds) {
+ for (Integer columnId : sourceIncludes) {
includes[columnId] = true;
assert inclBatchIx <= columnId;
// Note that we use the same vectors in both batches. Clever, very clever.
@@ -135,7 +155,7 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
// This is only used by ORC to derive the structure. Most fields are unused.
destinationOi = new LazySimpleStructObjectInspector(
childNames, childOis, null, (byte)0, null);
- destinationBatch.setPartitionInfo(columnIds.size(), 0);
+ destinationBatch.setPartitionInfo(sourceIncludes.size(), 0);
if (LlapIoImpl.LOG.isDebugEnabled()) {
LlapIoImpl.LOG.debug("Includes for deserializer are " + DebugUtils.toString(includes));
}
@@ -154,6 +174,23 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
throw new IOException(e);
}
}
+ this.isAsync = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ASYNC_ENABLED);
+ if (isAsync) {
+ currentBatches = new LinkedList<>();
+ queue = new ConcurrentLinkedQueue<>();
+ orcThread = new Thread(this);
+ orcThread.setDaemon(true);
+ orcThread.setName(Thread.currentThread().getName() + "-OrcEncode");
+ } else {
+ queue = null;
+ orcThread = null;
+ currentBatches = null;
+ }
+ }
+
+ public void startAsync(AsyncCallback callback) {
+ this.completion = callback;
+ this.orcThread.start();
}
private static VectorizedRowBatchCtx createVrbCtx(StructObjectInspector oi) throws IOException {
@@ -176,13 +213,57 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
}
@Override
- public boolean hasIncludes() {
- return hasIncludes;
+ public void init(CacheWriter cacheWriter, Configuration conf, Path path) throws IOException {
+ this.orcWriter = super.createOrcWriter(cacheWriter, conf, path, destinationOi);
+ this.cacheWriter = cacheWriter;
+ }
+
+ public interface AsyncCallback {
+ void onComplete(VertorDeserializeOrcWriter writer);
}
@Override
- public StructObjectInspector getDestinationOi() {
- return destinationOi;
+ public void run() {
+ while (true) {
+ WriteOperation op = null;
+ int fallbackMs = 8;
+ while (true) {
+ op = queue.poll();
+ if (op != null) break;
+ if (fallbackMs > 262144) { // Arbitrary... we don't expect caller to hang out for 7+ mins.
+ LlapIoImpl.LOG.error("ORC encoder timed out waiting for input");
+ discardData();
+ return;
+ }
+ try {
+ Thread.sleep(fallbackMs);
+ } catch (InterruptedException e) {
+ LlapIoImpl.LOG.error("ORC encoder interrupted waiting for input");
+ discardData();
+ return;
+ }
+ fallbackMs <<= 1;
+ }
+ try {
+ if (op.apply(orcWriter, cacheWriter)) {
+ LlapIoImpl.LOG.info("ORC encoder received a exit event");
+ completion.onComplete(this);
+ return;
+ }
+ } catch (Exception e) {
+ LlapIoImpl.LOG.error("ORC encoder failed", e);
+ discardData();
+ return;
+ }
+ }
+ }
+
+ private void discardData() {
+ try {
+ cacheWriter.discardData();
+ } catch (Exception ex) {
+ LlapIoImpl.LOG.error("Failed to close an async cache writer", ex);
+ }
}
@Override
@@ -196,7 +277,7 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
// Deserialize and append new row using the current batch size as the index.
try {
- // TODO: can we use ByRef? Probably not, need to see text record reader.
+ // Not using ByRef now since it's unsafe for text readers. Might be safe for others.
vectorDeserializeRow.deserialize(sourceBatch, sourceBatch.size++);
} catch (Exception e) {
throw new IOException("DeserializeRead detail: "
@@ -206,19 +287,36 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
private void flushBatch() throws IOException {
addBatchToWriter();
-
- for (int c = 0; c < sourceBatch.cols.length; ++c) {
- // This resets vectors in both batches.
- ColumnVector colVector = sourceBatch.cols[c];
- if (colVector != null) {
- colVector.reset();
- colVector.init();
+ if (!isAsync) {
+ for (int c = 0; c < sourceBatch.cols.length; ++c) {
+ // This resets vectors in both batches.
+ ColumnVector colVector = sourceBatch.cols[c];
+ if (colVector != null) {
+ colVector.reset();
+ colVector.init();
+ }
+ }
+ sourceBatch.selectedInUse = false;
+ sourceBatch.size = 0;
+ sourceBatch.endOfFile = false;
+ propagateSourceBatchFieldsToDest();
+ } else {
+ // In addBatchToWriter, we have passed the batch to both ORC and operator pipeline
+ // (neither ever changes the vectors). We'd need a set of vectors batch to write to.
+ // TODO: for now, create this from scratch. Ideally we should return the vectors from ops.
+ // We could also have the ORC thread create it for us in its spare time...
+ this.sourceBatch = vrbCtx.createVectorizedRowBatch();
+ if (usesSourceIncludes) {
+ this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size());
+ int inclBatchIx = 0;
+ for (Integer columnId : sourceIncludes) {
+ destinationBatch.cols[inclBatchIx++] = sourceBatch.cols[columnId];
+ }
+ destinationBatch.setPartitionInfo(sourceIncludes.size(), 0);
+ } else {
+ this.destinationBatch = sourceBatch;
}
}
- sourceBatch.selectedInUse = false;
- sourceBatch.size = 0;
- sourceBatch.endOfFile = false;
- propagateSourceBatchFieldsToDest();
}
private void propagateSourceBatchFieldsToDest() {
@@ -230,8 +328,12 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
private void addBatchToWriter() throws IOException {
propagateSourceBatchFieldsToDest();
- // LlapIoImpl.LOG.info("Writing includeOnlyBatch " + s + "; data "+ includeOnlyBatch);
- orcWriter.addRowBatch(destinationBatch);
+ if (!isAsync) {
+ orcWriter.addRowBatch(destinationBatch);
+ } else {
+ currentBatches.add(destinationBatch);
+ addWriteOp(new VrbOperation(destinationBatch));
+ }
}
@Override
@@ -243,7 +345,28 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
@Override
public void writeIntermediateFooter() throws IOException {
- orcWriter.writeIntermediateFooter();
+ if (isAsync) {
+ addWriteOp(new IntermediateFooterOperation());
+ } else {
+ orcWriter.writeIntermediateFooter();
+ }
+ }
+
+ private void addWriteOp(WriteOperation wo) throws AssertionError {
+ if (queue.offer(wo)) return;
+ throw new AssertionError("Queue full"); // This should never happen with linked list queue.
+ }
+
+ @Override
+ public void setCurrentStripeOffsets(long currentKnownTornStart,
+ long firstStartOffset, long lastStartOffset, long fileOffset) {
+ if (isAsync) {
+ addWriteOp(new SetStripeDataOperation(
+ currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset));
+ } else {
+ cacheWriter.setCurrentStripeOffsets(
+ currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+ }
}
@Override
@@ -251,11 +374,85 @@ class VertorDeserializeOrcWriter implements EncodingWriter {
if (sourceBatch.size > 0) {
addBatchToWriter();
}
- orcWriter.close();
+ if (!isAsync) {
+ orcWriter.close();
+ } else {
+ addWriteOp(new CloseOperation());
+ }
+ }
+
+ public List<VectorizedRowBatch> extractCurrentVrbs() {
+ if (!isAsync) return null;
+ List<VectorizedRowBatch> result = currentBatches;
+ currentBatches = new LinkedList<>();
+ return result;
+ }
+
+ private static interface WriteOperation {
+ boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException;
+ }
+
+ private static class VrbOperation implements WriteOperation {
+ private VectorizedRowBatch batch;
+
+ public VrbOperation(VectorizedRowBatch batch) {
+ // LlapIoImpl.LOG.debug("Adding batch " + batch);
+ this.batch = batch;
+ }
+
+ @Override
+ public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+ // LlapIoImpl.LOG.debug("Writing batch " + batch);
+ writer.addRowBatch(batch);
+ return false;
+ }
+ }
+
+ private static class IntermediateFooterOperation implements WriteOperation {
+ @Override
+ public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+ writer.writeIntermediateFooter();
+ return false;
+ }
+ }
+
+ private static class SetStripeDataOperation implements WriteOperation {
+ private final long currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset;
+ public SetStripeDataOperation(long currentKnownTornStart,
+ long firstStartOffset, long lastStartOffset, long fileOffset) {
+ this.currentKnownTornStart = currentKnownTornStart;
+ this.firstStartOffset = firstStartOffset;
+ this.lastStartOffset = lastStartOffset;
+ this.fileOffset = fileOffset;
+ }
+
+ @Override
+ public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+ cacheWriter.setCurrentStripeOffsets(
+ currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset);
+ return false;
+ }
+ }
+
+ private static class CloseOperation implements WriteOperation {
+ @Override
+ public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException {
+ writer.close();
+ return true; // The thread should stop after this.
+ }
+ }
+
+ public boolean[] getOriginalCacheIncludes() {
+ return cacheIncludes;
}
@Override
- public void init(Writer orcWriter) {
- this.orcWriter = orcWriter;
+ public boolean isOnlyWritingIncludedColumns() {
+ return usesSourceIncludes;
+ }
+
+ public void interrupt() {
+ assert orcThread != null;
+ orcThread.interrupt();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f273cc5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 99cc506..369584b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -350,6 +350,45 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
/**
+ * Reverses genIncludedColumns; produces the table columns indexes from ORC included columns.
+ * @param readerSchema The ORC reader schema for the table.
+ * @param included The included ORC columns.
+ * @param isFullColumnMatch Whether full column match should be enforced (i.e. whether to expect
+ * that all the sub-columns or a complex type column should be included or excluded
+ * together in the included array. If false, any sub-column being included for a complex
+ * type is sufficient for the entire complex column to be included in the result.
+ * @return The list of table column indexes.
+ */
+ public static List<Integer> genIncludedColumnsReverse(
+ TypeDescription readerSchema, boolean[] included, boolean isFullColumnMatch) {
+ assert included != null;
+ List<Integer> result = new ArrayList<>();
+ List<TypeDescription> children = readerSchema.getChildren();
+ for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) {
+ TypeDescription child = children.get(columnNumber);
+ int id = child.getId();
+ int maxId = child.getMaximumId();
+ if (id >= included.length || maxId >= included.length) {
+ throw new AssertionError("Inconsistent includes: " + included.length
+ + " elements; found column ID " + id);
+ }
+ boolean isIncluded = included[id];
+ for (int col = id + 1; col <= maxId; ++col) {
+ if (isFullColumnMatch && included[col] != isIncluded) {
+ throw new AssertionError("Inconsistent includes: root column IDs are [" + id + ", "
+ + maxId + "]; included[" + col + "] = " + included[col] + ", which is different "
+ + " from the previous IDs of the same root column.");
+ }
+ isIncluded = isIncluded || included[col];
+ }
+ if (isIncluded) {
+ result.add(columnNumber);
+ }
+ }
+ return result;
+ }
+
+ /**
* Take the configuration and figure out which columns we need to include.
* @param readerSchema the types for the reader
* @param conf the configuration