You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/01/17 00:43:34 UTC
hive git commit: HIVE-18452 : work around HADOOP-15171 (Sergey
Shelukhin, reviewed by Ashutosh Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 72684f10d -> 78d5572f8
HIVE-18452 : work around HADOOP-15171 (Sergey Shelukhin, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/78d5572f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/78d5572f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/78d5572f
Branch: refs/heads/master
Commit: 78d5572f8e165a907c63e1ef8579f591b8c34563
Parents: 72684f1
Author: sergey <se...@apache.org>
Authored: Tue Jan 16 16:43:03 2018 -0800
Committer: sergey <se...@apache.org>
Committed: Tue Jan 16 16:43:03 2018 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../llap/io/encoded/OrcEncodedDataReader.java | 20 +++--
.../ql/io/orc/encoded/EncodedReaderImpl.java | 83 ++++++++++++++++++--
.../hadoop/hive/ql/io/orc/encoded/Reader.java | 2 +-
.../hive/ql/io/orc/encoded/ReaderImpl.java | 4 +-
5 files changed, 98 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 854bbdf..f2e927f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1382,6 +1382,8 @@ public class HiveConf extends Configuration {
"while writing a table with ORC file format, enabling this config will do stripe-level\n" +
"fast merge for small ORC files. Note that enabling this config will not honor the\n" +
"padding tolerance config (hive.exec.orc.block.padding.tolerance)."),
+ HIVE_ORC_CODEC_POOL("hive.use.orc.codec.pool", true,
+ "Whether to use codec pool in ORC. Disable if there are bugs with codec reuse."),
HIVEUSEEXPLICITRCFILEHEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 1e0eccf..68bb168 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -161,6 +161,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private final QueryFragmentCounters counters;
private final UserGroupInformation ugi;
private final SchemaEvolution evolution;
+ private final boolean useCodecPool;
// Read state.
private int stripeIxFrom;
@@ -173,6 +174,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
private CompressionCodec codec;
private Object fileKey;
private FileSystem fs;
+
/**
* stripeRgs[stripeIx'] => boolean array (could be a bitmask) of rg-s that need to be read.
* Contains only stripes that are read, and only columns included. null => read all RGs.
@@ -212,6 +214,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
} catch (IOException e) {
throw new RuntimeException(e);
}
+ this.useCodecPool = HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_ORC_CODEC_POOL);
// moved this part of code from performDataRead as LlapInputFormat need to know the file schema
// to decide if schema evolution is supported or not.
@@ -443,7 +446,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
ensureOrcReader();
// Reader creation updates HDFS counters, don't do it here.
DataWrapperForOrc dw = new DataWrapperForOrc();
- stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, trace);
+ stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, trace,
+ HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_ORC_CODEC_POOL));
stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
}
@@ -739,18 +743,24 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
assert footerRange.next == null; // Can only happens w/zcr for a single input buffer.
if (hasCache) {
- LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(stripeKey, footerRange.getData());
+ LlapBufferOrBuffers cacheBuf = metadataCache.putStripeTail(
+ stripeKey, footerRange.getData().duplicate());
metadataCache.decRefBuffer(cacheBuf); // We don't use this one.
}
- ByteBuffer bb = footerRange.getData();
+ ByteBuffer bb = footerRange.getData().duplicate();
CompressionKind kind = orcReader.getCompressionKind();
- CompressionCodec codec = OrcCodecPool.getCodec(kind);
+ boolean isPool = useCodecPool;
+ CompressionCodec codec = isPool ? OrcCodecPool.getCodec(kind) : WriterImpl.createCodec(kind);
try {
return buildStripeFooter(Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
bb.remaining(), codec, orcReader.getCompressionSize());
} finally {
- OrcCodecPool.returnCodec(kind, codec);
+ if (isPool) {
+ OrcCodecPool.returnCodec(kind, codec);
+ } else {
+ codec.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 627e617..555bda7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
import org.apache.orc.OrcConf;
@@ -54,6 +55,7 @@ import org.apache.orc.impl.OutStream;
import org.apache.orc.impl.RecordReaderUtils;
import org.apache.orc.impl.StreamName;
import org.apache.orc.impl.StreamName.Area;
+import org.apache.orc.impl.WriterImpl;
import org.apache.orc.StripeInformation;
import org.apache.orc.impl.BufferChunk;
import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace.RangesSrc;
@@ -126,6 +128,7 @@ class EncodedReaderImpl implements EncodedReader {
private final DataReader dataReader;
private boolean isDataReaderOpen = false;
private final CompressionCodec codec;
+ private final boolean isCodecFromPool;
private final boolean isCompressed;
private final org.apache.orc.CompressionKind compressionKind;
private final int bufferSize;
@@ -140,11 +143,12 @@ class EncodedReaderImpl implements EncodedReader {
public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types,
TypeDescription fileSchema, org.apache.orc.CompressionKind kind, WriterVersion version,
int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader,
- PoolFactory pf, IoTrace trace) throws IOException {
+ PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException {
this.fileKey = fileKey;
this.compressionKind = kind;
this.isCompressed = kind != org.apache.orc.CompressionKind.NONE;
- this.codec = OrcCodecPool.getCodec(kind);
+ this.isCodecFromPool = useCodecPool;
+ this.codec = useCodecPool ? OrcCodecPool.getCodec(kind) : WriterImpl.createCodec(kind);
this.types = types;
this.fileSchema = fileSchema; // Note: this is redundant with types
this.version = version;
@@ -672,7 +676,11 @@ class EncodedReaderImpl implements EncodedReader {
@Override
public void close() throws IOException {
- OrcCodecPool.returnCodec(compressionKind, codec);
+ if (isCodecFromPool) {
+ OrcCodecPool.returnCodec(compressionKind, codec);
+ } else {
+ codec.close();
+ }
dataReader.close();
}
@@ -1229,14 +1237,61 @@ class EncodedReaderImpl implements EncodedReader {
private static void decompressChunk(
ByteBuffer src, CompressionCodec codec, ByteBuffer dest) throws IOException {
int startPos = dest.position(), startLim = dest.limit();
+ int startSrcPos = src.position(), startSrcLim = src.limit();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Decompressing " + src.remaining() + " bytes to dest buffer pos "
+ + dest.position() + ", limit " + dest.limit());
+ }
codec.decompress(src, dest);
- // Codec resets the position to 0 and limit to correct limit.
dest.position(startPos);
int newLim = dest.limit();
if (newLim > startLim) {
throw new AssertionError("After codec, buffer [" + startPos + ", " + startLim
+ ") became [" + dest.position() + ", " + newLim + ")");
}
+ if (dest.remaining() > 0) return;
+
+ // There's a bug in native decompressor. See HADOOP-15171
+ dest.limit(startLim);
+ src.position(startSrcPos);
+ src.limit(startSrcLim);
+ LOG.warn("The codec has produced 0 bytes for " + src.remaining() + " bytes at pos "
+ + src.position() + ", data hash " + src.hashCode() + ": [" + logSomeBytes(src));
+ ByteBuffer srcHeap = ByteBuffer.allocate(src.remaining()),
+ destHeap = ByteBuffer.allocate(dest.remaining());
+ int destHeapPos = destHeap.position();
+ srcHeap.put(src);
+ srcHeap.position(startSrcPos);
+ codec.decompress(srcHeap, destHeap);
+ destHeap.position(destHeapPos);
+ int newLen = destHeap.remaining();
+ LOG.warn("Fell back to JDK decompressor with memcopy; got " + newLen + " bytes");
+ dest.put(destHeap);
+ dest.position(startPos);
+ dest.limit(startPos + newLen);
+ }
+
+ private static String logSomeBytes(ByteBuffer src) {
+ final int max = 500;
+ StringBuilder sb = new StringBuilder();
+ int base = src.position(), end = base + Math.min(max, src.remaining());
+ for (int i = base; i < end; ++i) {
+ if (i != base) {
+ sb.append(' ');
+ }
+ int b = src.get(i) & 0xff;
+ if (b <= 0xf) {
+ sb.append('0');
+ }
+ sb.append(Integer.toHexString(b));
+ }
+ int rem = src.remaining() - max;
+ if (rem > 0) {
+ sb.append(" ... (").append(rem).append(" bytes)]");
+ } else {
+ sb.append("]");
+ }
+ return sb.toString();
}
private void ponderReleaseInitialRefcount(
@@ -1865,12 +1920,19 @@ class EncodedReaderImpl implements EncodedReader {
if (lastCached != null) {
iter = lastCached;
}
+ if (isTracingEnabled) {
+ traceLogBuffersUsedToParse(csd);
+ }
CodedInputStream cis = CodedInputStream.newInstance(
new IndexStream(csd.getCacheBuffers(), sctx.length));
cis.setSizeLimit(InStream.PROTOBUF_MESSAGE_MAX_LIMIT);
switch (sctx.kind) {
case ROW_INDEX:
- index.getRowGroupIndex()[colIx] = OrcProto.RowIndex.parseFrom(cis);
+ OrcProto.RowIndex tmp = index.getRowGroupIndex()[colIx]
+ = OrcProto.RowIndex.parseFrom(cis);
+ if (isTracingEnabled) {
+ LOG.trace("Index is " + tmp.toString().replace('\n', ' '));
+ }
break;
case BLOOM_FILTER:
case BLOOM_FILTER_UTF8:
@@ -1911,6 +1973,17 @@ class EncodedReaderImpl implements EncodedReader {
}
}
+ private void traceLogBuffersUsedToParse(ColumnStreamData csd) {
+ String s = "Buffers ";
+ if (csd.getCacheBuffers() != null) {
+ for (MemoryBuffer buf : csd.getCacheBuffers()) {
+ ByteBuffer bb = buf.getByteBufferDup();
+ s += "{" + buf + ", " + bb.remaining() + /* " => " + bb.hashCode() + */"}, ";
+ }
+ }
+ LOG.trace(s);
+ }
+
private DiskRangeList preReadUncompressedStreams(long stripeOffset, ReadContext[] colCtxs,
MutateHelper toRead, IdentityHashMap<ByteBuffer, Boolean> toRelease) throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
index df536ea..7986827 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java
@@ -46,7 +46,7 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader {
* @return The reader.
*/
EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
- PoolFactory pf, IoTrace trace) throws IOException;
+ PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException;
/** The factory that can create (or return) the pools used by encoded reader. */
public interface PoolFactory {
http://git-wip-us.apache.org/repos/asf/hive/blob/78d5572f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
index 203ef69..4a5ccaa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java
@@ -35,8 +35,8 @@ class ReaderImpl extends org.apache.hadoop.hive.ql.io.orc.ReaderImpl implements
@Override
public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader,
- PoolFactory pf, IoTrace trace) throws IOException {
+ PoolFactory pf, IoTrace trace, boolean useCodecPool) throws IOException {
return new EncodedReaderImpl(fileKey, types, getSchema(), compressionKind, getWriterVersion(),
- bufferSize, rowIndexStride, dataCache, dataReader, pf, trace);
+ bufferSize, rowIndexStride, dataCache, dataReader, pf, trace, useCodecPool);
}
}