You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2018/05/15 22:40:47 UTC
[27/50] [abbrv] hive git commit: HIVE-19479 : encoded stream seek is
incorrect for 0-length RGs in LLAP IO (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
HIVE-19479 : encoded stream seek is incorrect for 0-length RGs in LLAP IO (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e941bea8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e941bea8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e941bea8
Branch: refs/heads/branch-3.0.0
Commit: e941bea80393e74efd64b07355b2e0fac384f7cc
Parents: 9ebb2ff
Author: sergey <se...@apache.org>
Authored: Fri May 11 12:01:10 2018 -0700
Committer: Vineet Garg <vg...@apache.org>
Committed: Sun May 13 21:03:44 2018 -0700
----------------------------------------------------------------------
.../llap/io/decode/OrcEncodedDataConsumer.java | 3 +
.../ql/io/orc/encoded/EncodedReaderImpl.java | 10 +-
.../orc/encoded/EncodedTreeReaderFactory.java | 118 ++++++++++---------
3 files changed, 70 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e941bea8/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index fc0c66a..05282db 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -296,6 +296,9 @@ public class OrcEncodedDataConsumer
ConsumerStripeMetadata stripeMetadata) throws IOException {
PositionProvider[] pps = createPositionProviders(
columnReaders, batch.getBatchKey(), stripeMetadata);
+ if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.ORC_LOGGER.trace("Created pps {}", Arrays.toString(pps));
+ }
if (pps == null) return;
for (int i = 0; i < columnReaders.length; i++) {
TreeReader reader = columnReaders[i];
http://git-wip-us.apache.org/repos/asf/hive/blob/e941bea8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 1d7eceb..348f9df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -435,12 +435,12 @@ class EncodedReaderImpl implements EncodedReader {
try {
if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || index == null) {
// This stream is for entire stripe and needed for every RG; uncompress once and reuse.
- if (isTracingEnabled) {
- LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
- + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
- }
- trace.logStartStripeStream(sctx.kind);
if (sctx.stripeLevelStream == null) {
+ if (isTracingEnabled) {
+ LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
+ + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
+ }
+ trace.logStartStripeStream(sctx.kind);
sctx.stripeLevelStream = POOLS.csdPool.take();
// We will be using this for each RG while also sending RGs to processing.
// To avoid buffers being unlocked, run refcount one ahead; so each RG
http://git-wip-us.apache.org/repos/asf/hive/blob/e941bea8/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
index 42532f9..646b214 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.orc.CompressionCodec;
import org.apache.orc.TypeDescription;
import org.apache.orc.TypeDescription.Category;
+import org.apache.orc.impl.InStream;
import org.apache.orc.impl.PositionProvider;
import org.apache.orc.impl.SettableUncompressedStream;
import org.apache.orc.impl.TreeReaderFactory;
@@ -213,6 +214,11 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
}
+ private static void skipCompressedIndex(boolean isCompressed, PositionProvider index) {
+ if (!isCompressed) return;
+ index.getNext();
+ }
+
protected static class StringStreamReader extends StringTreeReader
implements SettableTreeReader {
private boolean _isFileCompressed;
@@ -260,30 +266,30 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
+ skipCompressedIndex(_isFileCompressed, index);
if (_dataStream != null && _dataStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
((StringDictionaryTreeReader) reader).getReader().seek(index);
- }
+ } // No need to skip seek here, index won't be used anymore.
} else {
// DIRECT encoding
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
+ skipCompressedIndex(_isFileCompressed, index);
+ // TODO: why does the original code not just use _dataStream that it passes in as stream?
+ InStream stream = ((StringDirectTreeReader) reader).getStream();
+ // TODO: not clear why this check and skipSeek are needed.
if (_dataStream != null && _dataStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
- ((StringDirectTreeReader) reader).getStream().seek(index);
+ stream.seek(index);
+ } else {
+ assert stream == _dataStream;
+ skipSeek(index);
}
+ skipCompressedIndex(_isFileCompressed, index);
if (_lengthStream != null && _lengthStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
((StringDirectTreeReader) reader).getLengths().seek(index);
- }
+ } // No need to skip seek here, index won't be used anymore.
}
}
@@ -830,10 +836,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
+ skipCompressedIndex(_isFileCompressed, index);
if (_dataStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
stream.seek(index);
}
}
@@ -945,10 +949,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
+ skipCompressedIndex(_isFileCompressed, index);
if (_dataStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
stream.seek(index);
}
}
@@ -1071,19 +1073,19 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
+ skipCompressedIndex(_isFileCompressed, index);
+ // TODO: not clear why this check and skipSeek are needed.
if (_valueStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
valueStream.seek(index);
+ } else {
+ assert valueStream == _valueStream;
+ skipSeek(index);
}
+ skipCompressedIndex(_isFileCompressed, index);
if (_scaleStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
scaleReader.seek(index);
- }
+ } // No need to skip seek here, index won't be used anymore.
}
@Override
@@ -1375,30 +1377,29 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
+ skipCompressedIndex(_isFileCompressed, index);
if (_dataStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
((StringDictionaryTreeReader) reader).getReader().seek(index);
- }
+ } // No need to skip seek here, index won't be used anymore.
} else {
// DIRECT encoding
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
+ skipCompressedIndex(_isFileCompressed, index);
+ InStream stream = ((StringDirectTreeReader) reader).getStream();
+ // TODO: not clear why this check and skipSeek are needed.
if (_dataStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
- ((StringDirectTreeReader) reader).getStream().seek(index);
+ stream.seek(index);
+ } else {
+ assert stream == _dataStream;
+ skipSeek(index);
}
+ skipCompressedIndex(_isFileCompressed, index);
if (_lengthStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
((StringDirectTreeReader) reader).getLengths().seek(index);
- }
+ } // No need to skip seek here, index won't be used anymore.
}
}
@@ -1574,30 +1575,29 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
+ skipCompressedIndex(_isFileCompressed, index);
if (_dataStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
((StringDictionaryTreeReader) reader).getReader().seek(index);
- }
+ } // No need to skip seek here, index won't be used anymore.
} else {
// DIRECT encoding
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
+ skipCompressedIndex(_isFileCompressed, index);
+ InStream stream = ((StringDirectTreeReader) reader).getStream();
+ // TODO: not clear why this check and skipSeek are needed.
if (_dataStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
- ((StringDirectTreeReader) reader).getStream().seek(index);
+ stream.seek(index);
+ } else {
+ assert stream == _dataStream;
+ skipSeek(index);
}
+ skipCompressedIndex(_isFileCompressed, index);
if (_lengthStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
((StringDirectTreeReader) reader).getLengths().seek(index);
- }
+ } // No need to skip seek here, index won't be used anymore.
}
}
@@ -1885,19 +1885,19 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
// data stream could be empty stream or already reached end of stream before present stream.
// This can happen if all values in stream are nulls or last row group values are all null.
+ skipCompressedIndex(_isFileCompressed, index);
+ // TODO: not clear why this check and skipSeek are needed.
if (_dataStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
stream.seek(index);
+ } else {
+ assert stream == _dataStream;
+ skipSeek(index);
}
+ skipCompressedIndex(_isFileCompressed, index);
if (lengths != null && _lengthsStream.available() > 0) {
- if (_isFileCompressed) {
- index.getNext();
- }
lengths.seek(index);
- }
+ } // No need to skip seek here, index won't be used anymore.
}
@Override
@@ -2132,6 +2132,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
+ private static void skipSeek(PositionProvider index) {
+ // Must be consistent with uncompressed stream seek in ORC. See call site comments.
+ index.getNext();
+ }
+
+
private static TreeReader createEncodedTreeReader(TypeDescription schema,
List<OrcProto.ColumnEncoding> encodings, OrcEncodedColumnBatch batch,
CompressionCodec codec, TreeReaderFactory.Context context) throws IOException {