You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/27 22:48:42 UTC
[1/4] hive git commit: HIVE-11259 : LLAP: clean up ORC dependencies
part 1 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/llap 5cd092b8b -> 1e3b59d37
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
index d0295d9..1ce7c0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
@@ -27,55 +27,14 @@ import java.util.ListIterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.DiskRange;
-import org.apache.hadoop.hive.common.DiskRangeList;
-import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.CacheChunkFactory;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
-import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
-import org.apache.hive.common.util.FixedSizedObjectPool;
-import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
import com.google.common.annotations.VisibleForTesting;
public abstract class InStream extends InputStream {
private static final Log LOG = LogFactory.getLog(InStream.class);
- private static final FixedSizedObjectPool<TrackedCacheChunk> TCC_POOL =
- new FixedSizedObjectPool<>(1024, new PoolObjectHelper<TrackedCacheChunk>() {
- @Override
- protected TrackedCacheChunk create() {
- return new TrackedCacheChunk();
- }
- @Override
- protected void resetBeforeOffer(TrackedCacheChunk t) {
- t.reset();
- }
- });
- private static final FixedSizedObjectPool<ProcCacheChunk> PCC_POOL =
- new FixedSizedObjectPool<>(1024, new PoolObjectHelper<ProcCacheChunk>() {
- @Override
- protected ProcCacheChunk create() {
- return new ProcCacheChunk();
- }
- @Override
- protected void resetBeforeOffer(ProcCacheChunk t) {
- t.reset();
- }
- });
- final static CacheChunkFactory CC_FACTORY = new CacheChunkFactory() {
- @Override
- public DiskRangeList createCacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
- TrackedCacheChunk tcc = TCC_POOL.take();
- tcc.init(buffer, offset, end);
- return tcc;
- }
- };
+
protected final Long fileId;
protected final String name;
protected long length;
@@ -220,49 +179,31 @@ public abstract class InStream extends InputStream {
private static class CompressedStream extends InStream {
private final List<DiskRange> bytes;
private final int bufferSize;
- private LlapMemoryBuffer cacheBuffer;
private ByteBuffer uncompressed;
private final CompressionCodec codec;
private ByteBuffer compressed;
private long currentOffset;
private int currentRange;
private boolean isUncompressedOriginal;
- private final LowLevelCache cache;
- private final boolean doManageBuffers = true;
public CompressedStream(Long fileId, String name, List<DiskRange> input, long length,
- CompressionCodec codec, int bufferSize, LowLevelCache cache) {
+ CompressionCodec codec, int bufferSize) {
super(fileId, name, length);
this.bytes = input;
this.codec = codec;
this.bufferSize = bufferSize;
currentOffset = 0;
currentRange = 0;
- this.cache = cache;
}
- // TODO: This should not be used for main path.
- private final LlapMemoryBuffer[] singleAllocDest = new LlapMemoryBuffer[1];
private void allocateForUncompressed(int size, boolean isDirect) {
- if (cache == null) {
- cacheBuffer = null;
- uncompressed = allocateBuffer(size, isDirect);
- } else {
- singleAllocDest[0] = null;
- cache.getAllocator().allocateMultiple(singleAllocDest, size);
- cacheBuffer = singleAllocDest[0];
- uncompressed = cacheBuffer.getByteBufferDup();
- }
+ uncompressed = allocateBuffer(size, isDirect);
}
private void readHeader() throws IOException {
if (compressed == null || compressed.remaining() <= 0) {
seek(currentOffset);
}
- if (cacheBuffer != null) {
- assert compressed == null;
- return; // Next block is ready from cache.
- }
if (compressed.remaining() > OutStream.HEADER_SIZE) {
int b0 = compressed.get() & 0xff;
int b1 = compressed.get() & 0xff;
@@ -293,13 +234,7 @@ public abstract class InStream extends InputStream {
uncompressed.clear();
}
codec.decompress(slice, uncompressed);
- if (cache != null) {
- // this is the inefficient path
- // TODO: this is invalid; base stripe offset should be passed, return value handled.
- //cache.putFileData(fileId, new DiskRange[] { new DiskRange(originalOffset,
- // chunkLength + OutStream.HEADER_SIZE) }, new LlapMemoryBuffer[] { cacheBuffer }, 0);
- }
- }
+ }
} else {
throw new IllegalStateException("Can't read header at " + this);
}
@@ -342,19 +277,10 @@ public abstract class InStream extends InputStream {
@Override
public void close() {
- cacheBuffer = null;
uncompressed = null;
compressed = null;
currentRange = bytes.size();
currentOffset = length;
- if (doManageBuffers) {
- // TODO: this is the inefficient path for now. LLAP will used this differently.
- for (DiskRange range : bytes) {
- if (range instanceof CacheChunk) {
- cache.releaseBuffer(((CacheChunk)range).buffer);
- }
- }
- }
bytes.clear();
}
@@ -439,20 +365,10 @@ public abstract class InStream extends InputStream {
for (DiskRange range : bytes) {
if (range.getOffset() <= desired && desired < range.getEnd()) {
currentRange = i;
- if (range instanceof BufferChunk) {
- cacheBuffer = null;
- compressed = range.getData().duplicate();
- int pos = compressed.position();
- pos += (int)(desired - range.getOffset());
- compressed.position(pos);
- } else {
- compressed = null;
- cacheBuffer = ((CacheChunk)range).buffer;
- uncompressed = cacheBuffer.getByteBufferDup();
- if (desired != range.getOffset()) {
- throw new IOException("Cannot seek into the middle of uncompressed cached data");
- }
- }
+ compressed = range.getData().duplicate();
+ int pos = compressed.position();
+ pos += (int)(desired - range.getOffset());
+ compressed.position(pos);
currentOffset = desired;
return;
}
@@ -463,19 +379,8 @@ public abstract class InStream extends InputStream {
if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
DiskRange range = bytes.get(segments - 1);
currentRange = segments - 1;
- if (range instanceof BufferChunk) {
- cacheBuffer = null;
- compressed = range.getData().duplicate();
- compressed.position(compressed.limit());
- } else {
- compressed = null;
- cacheBuffer = ((CacheChunk)range).buffer;
- uncompressed = cacheBuffer.getByteBufferDup();
- uncompressed.position(uncompressed.limit());
- if (desired != range.getOffset()) {
- throw new IOException("Cannot seek into the middle of uncompressed cached data");
- }
- }
+ compressed = range.getData().duplicate();
+ compressed.position(compressed.limit());
currentOffset = desired;
return;
}
@@ -542,7 +447,7 @@ public abstract class InStream extends InputStream {
for (int i = 0; i < buffers.length; ++i) {
input.add(new BufferChunk(buffers[i], offsets[i]));
}
- return create(fileId, streamName, input, length, codec, bufferSize, null);
+ return create(fileId, streamName, input, length, codec, bufferSize);
}
/**
@@ -561,837 +466,11 @@ public abstract class InStream extends InputStream {
List<DiskRange> input,
long length,
CompressionCodec codec,
- int bufferSize,
- LowLevelCache cache) throws IOException {
+ int bufferSize) throws IOException {
if (codec == null) {
return new UncompressedStream(fileId, name, input, length);
} else {
- return new CompressedStream(fileId, name, input, length, codec, bufferSize, cache);
- }
- }
-
- /** Cache chunk which tracks whether it has been fully read. See
- EncodedReaderImpl class comment about refcounts. */
- // TODO: these classes need some cleanup. Find every cast and field access and change to OO;
- public static class TrackedCacheChunk extends CacheChunk {
- public boolean isReleased = false;
- public TrackedCacheChunk() {
- super(null, -1, -1);
- }
-
- public void init(LlapMemoryBuffer buffer, long offset, long end) {
- this.buffer = buffer;
- this.offset = offset;
- this.end = end;
- }
-
- public void reset() {
- this.buffer = null;
- this.next = this.prev = null;
- this.isReleased = false;
- }
-
- public void handleCacheCollision(LowLevelCache cache,
- LlapMemoryBuffer replacementBuffer, List<LlapMemoryBuffer> cacheBuffers) {
- throw new UnsupportedOperationException();
- }
- }
-
- // TODO: should we pool these? only used for uncompressed.
- private static class UncompressedCacheChunk extends TrackedCacheChunk {
- private BufferChunk chunk;
- private int count;
-
- public UncompressedCacheChunk(BufferChunk bc) {
- super();
- init(null, bc.getOffset(), bc.getEnd());
- chunk = bc;
- count = 1;
- }
-
- public void addChunk(BufferChunk bc) {
- assert bc.getOffset() == this.getEnd();
- this.end = bc.getEnd();
- ++count;
- }
-
- public BufferChunk getChunk() {
- return chunk;
- }
-
- public int getCount() {
- return count;
- }
-
- @Override
- public void handleCacheCollision(LowLevelCache cache,
- LlapMemoryBuffer replacementBuffer, List<LlapMemoryBuffer> cacheBuffers) {
- assert cacheBuffers == null;
- // This is done at pre-read stage where there's nothing special w/refcounts. Just release.
- if (DebugUtils.isTraceCachingEnabled()) {
- LOG.info("Deallocating " + buffer + " due to cache collision during pre-read");
- }
- cache.getAllocator().deallocate(buffer);
- // Replace the buffer in our big range list, as well as in current results.
- this.buffer = replacementBuffer;
- }
-
- public void clear() {
- this.chunk = null;
- this.count = -1;
- }
- }
-
- /**
- * CacheChunk that is pre-created for new cache data; initially, it contains an original disk
- * buffer and an unallocated LlapMemoryBuffer object. Before we expose it, the LMB is allocated,
- * the data is decompressed, and original compressed data is discarded. The chunk lives on in
- * the DiskRange list created for the request, and everyone treats it like regular CacheChunk.
- */
- private static class ProcCacheChunk extends TrackedCacheChunk {
- public void init(long cbStartOffset, long cbEndOffset, boolean isCompressed,
- ByteBuffer originalData, LlapMemoryBuffer targetBuffer, int originalCbIndex) {
- super.init(targetBuffer, cbStartOffset, cbEndOffset);
- this.isCompressed = isCompressed;
- this.originalData = originalData;
- this.originalCbIndex = originalCbIndex;
- }
-
- @Override
- public void reset() {
- super.reset();
- this.originalData = null;
- }
-
- @Override
- public void handleCacheCollision(LowLevelCache cache, LlapMemoryBuffer replacementBuffer,
- List<LlapMemoryBuffer> cacheBuffers) {
- assert originalCbIndex >= 0;
- // Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put,
- // and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer
- // is not in cache.
- if (DebugUtils.isTraceCachingEnabled()) {
- LOG.info("Deallocating " + buffer + " due to cache collision");
- }
- cache.getAllocator().deallocate(buffer);
- cache.notifyReused(replacementBuffer);
- // Replace the buffer in our big range list, as well as in current results.
- this.buffer = replacementBuffer;
- cacheBuffers.set(originalCbIndex, replacementBuffer);
- originalCbIndex = -1; // This can only happen once at decompress time.
- }
-
- boolean isCompressed;
- ByteBuffer originalData = null;
- int originalCbIndex;
- }
-
- /**
- * Uncompresses part of the stream. RGs can overlap, so we cannot just go and decompress
- * and remove what we have returned. We will keep iterator as a "hint" point.
- * @param fileName File name for cache keys.
- * @param baseOffset Absolute offset of boundaries and ranges relative to file, for cache keys.
- * @param start Ordered ranges containing file data. Helpful if they point close to cOffset.
- * @param cOffset Start offset to decompress.
- * @param endCOffset End offset to decompress; estimate, partial CBs will be ignored.
- * @param zcr Zero-copy reader, if any, to release discarded buffers.
- * @param codec Compression codec.
- * @param bufferSize Compressed buffer (CB) size.
- * @param cache Low-level cache to cache new data.
- * @param streamBuffer Stream buffer, to add the results.
- * @param unlockUntilCOffset The offset until which the buffers can be unlocked in cache, as
- * they will not be used in future calls (see the class comment in
- * EncodedReaderImpl about refcounts).
- * @param qfCounters
- * @return Last buffer cached during decomrpession. Cache buffers are never removed from
- * the master list, so they are safe to keep as iterators for various streams.
- */
- // TODO: move to EncodedReaderImpl
- // TODO: this method has too many arguments... perhaps we can clean it up
- public static DiskRangeList readEncodedStream(long fileId, long baseOffset, DiskRangeList start,
- long cOffset, long endCOffset, ZeroCopyReaderShim zcr, CompressionCodec codec,
- int bufferSize, LowLevelCache cache, StreamBuffer streamBuffer, long unlockUntilCOffset,
- long streamOffset, LowLevelCacheCounters qfCounters) throws IOException {
- if (streamBuffer.cacheBuffers == null) {
- streamBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>();
- } else {
- streamBuffer.cacheBuffers.clear();
- }
- if (cOffset == endCOffset) return null;
- boolean isCompressed = codec != null;
- List<ProcCacheChunk> toDecompress = null;
- List<ByteBuffer> toRelease = null;
- if (isCompressed) {
- toRelease = zcr == null ? null : new ArrayList<ByteBuffer>();
- toDecompress = new ArrayList<ProcCacheChunk>();
- }
-
- // 1. Find our bearings in the stream. Normally, iter will already point either to where we
- // want to be, or just before. However, RGs can overlap due to encoding, so we may have
- // to return to a previous block.
- DiskRangeList current = findExactPosition(start, cOffset);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Starting read for [" + cOffset + "," + endCOffset + ") at " + current);
- }
-
- TrackedCacheChunk lastUncompressed = null;
-
- // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
- lastUncompressed = isCompressed ?
- prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, unlockUntilCOffset,
- bufferSize, current, zcr, cache, streamBuffer, toRelease, toDecompress)
- : prepareRangesForUncompressedRead(
- cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, cache, streamBuffer);
-
- // 3. Allocate the buffers, prepare cache keys.
- // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
- // data and some unallocated membufs for decompression. toDecompress contains all the work we
- // need to do, and each item points to one of the membufs in cacheBuffers as target. The iter
- // has also been adjusted to point to these buffers instead of compressed data for the ranges.
- if (toDecompress == null) return lastUncompressed; // Nothing to decompress.
-
- LlapMemoryBuffer[] targetBuffers = new LlapMemoryBuffer[toDecompress.size()];
- DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
- int ix = 0;
- for (ProcCacheChunk chunk : toDecompress) {
- cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
- targetBuffers[ix] = chunk.buffer;
- ++ix;
- }
- cache.getAllocator().allocateMultiple(targetBuffers, bufferSize);
-
- // 4. Now decompress (or copy) the data into cache buffers.
- for (ProcCacheChunk chunk : toDecompress) {
- ByteBuffer dest = chunk.buffer.getByteBufferRaw();
- if (chunk.isCompressed) {
- decompressChunk(chunk.originalData, codec, dest);
- } else {
- copyUncompressedChunk(chunk.originalData, dest);
- }
-
- chunk.originalData = null;
- if (DebugUtils.isTraceLockingEnabled()) {
- LOG.info("Locking " + chunk.buffer + " due to reuse (after decompression)");
- }
- cache.notifyReused(chunk.buffer);
- }
-
- // 5. Release original compressed buffers to zero-copy reader if needed.
- if (toRelease != null) {
- assert zcr != null;
- for (ByteBuffer buf : toRelease) {
- zcr.releaseBuffer(buf);
- }
- }
-
- // 6. Finally, put uncompressed data to cache.
- long[] collisionMask = cache.putFileData(
- fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL, qfCounters);
- processCacheCollisions(
- cache, collisionMask, toDecompress, targetBuffers, streamBuffer.cacheBuffers);
-
- // 7. It may happen that we only use new compression buffers once. Release initial refcounts.
- for (ProcCacheChunk chunk : toDecompress) {
- ponderReleaseInitialRefcount(cache, unlockUntilCOffset, streamOffset, chunk);
- }
-
- return lastUncompressed;
- }
-
- private static TrackedCacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
- long streamOffset, long unlockUntilCOffset, int bufferSize, DiskRangeList current,
- ZeroCopyReaderShim zcr, LowLevelCache cache, StreamBuffer streamBuffer,
- List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress) throws IOException {
- if (cOffset > current.getOffset()) {
- // Target compression block is in the middle of the range; slice the range in two.
- current = current.split(cOffset).next;
- }
- long currentOffset = cOffset;
- TrackedCacheChunk lastUncompressed = null;
- while (true) {
- DiskRangeList next = null;
- if (current instanceof TrackedCacheChunk) {
- // 2a. This is a decoded compression buffer, add as is.
- TrackedCacheChunk cc = (TrackedCacheChunk)current;
- if (DebugUtils.isTraceLockingEnabled()) {
- LOG.info("Locking " + cc.buffer + " due to reuse");
- }
- boolean canReuse = cache.notifyReused(cc.buffer);
- assert canReuse;
- streamBuffer.cacheBuffers.add(cc.buffer);
- currentOffset = cc.getEnd();
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Adding an already-uncompressed buffer " + cc.buffer);
- }
- ponderReleaseInitialRefcount(cache, unlockUntilCOffset, streamOffset, cc);
- lastUncompressed = cc;
- next = current.next;
- } else if (current instanceof IncompleteCb) {
- // 2b. This is a known incomplete CB caused by ORC CB end boundaries being estimates.
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Cannot read " + current);
- }
- next = null;
- currentOffset = -1;
- } else {
- // 2c. This is a compressed buffer. We need to uncompress it; the buffer can comprise
- // several disk ranges, so we might need to combine them.
- BufferChunk bc = (BufferChunk)current;
- ProcCacheChunk newCached = addOneCompressionBuffer(bc, zcr, bufferSize,
- cache, streamBuffer.cacheBuffers, toDecompress, toRelease);
- lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
- next = (newCached != null) ? newCached.next : null;
- currentOffset = (next != null) ? next.getOffset() : -1;
- }
-
- if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
- break;
- }
- current = next;
- }
- return lastUncompressed;
- }
-
- private static TrackedCacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
- long streamOffset, long unlockUntilCOffset, DiskRangeList current, LowLevelCache cache,
- StreamBuffer streamBuffer) throws IOException {
- long currentOffset = cOffset;
- TrackedCacheChunk lastUncompressed = null;
- boolean isFirst = true;
- while (true) {
- DiskRangeList next = null;
- assert current instanceof TrackedCacheChunk;
- lastUncompressed = (TrackedCacheChunk)current;
- if (DebugUtils.isTraceLockingEnabled()) {
- LOG.info("Locking " + lastUncompressed.buffer + " due to reuse");
- }
- boolean canReuse = cache.notifyReused(lastUncompressed.buffer);
- assert canReuse;
- if (isFirst) {
- streamBuffer.indexBaseOffset = (int)(lastUncompressed.getOffset() - streamOffset);
- isFirst = false;
- }
- streamBuffer.cacheBuffers.add(lastUncompressed.buffer);
- currentOffset = lastUncompressed.getEnd();
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Adding an uncompressed buffer " + lastUncompressed.buffer);
- }
- ponderReleaseInitialRefcount(cache, unlockUntilCOffset, streamOffset, lastUncompressed);
- next = current.next;
- if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
- break;
- }
- current = next;
- }
- return lastUncompressed;
- }
-
- /**
- * To achieve some sort of consistent cache boundaries, we will cache streams deterministically;
- * in segments starting w/stream start, and going for either stream size or maximum allocation.
- * If we are not reading the entire segment's worth of data, then we will not cache the partial
- * RGs; the breakage of cache assumptions (no interleaving blocks, etc.) is way too much PITA
- * to handle just for this case.
- * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
- * allocator. Uncompressed case is not mainline though so let's not complicate it.
- * @param qfCounters
- */
- public static DiskRangeList preReadUncompressedStream(long fileId,
- long baseOffset, DiskRangeList start, long streamOffset, long streamEnd,
- ZeroCopyReaderShim zcr, LowLevelCache cache, LowLevelCacheCounters qfCounters)
- throws IOException {
- if (streamOffset == streamEnd) return null;
- List<UncompressedCacheChunk> toCache = null;
- List<ByteBuffer> toRelease = null;
-
- // 1. Find our bearings in the stream.
- DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Starting pre-read for [" + streamOffset + "," + streamEnd + ") at " + current);
- }
-
- if (streamOffset > current.getOffset()) {
- // Target compression block is in the middle of the range; slice the range in two.
- current = current.split(streamOffset).next;
- }
- // Account for maximum cache buffer size.
- long streamLen = streamEnd - streamOffset;
- int partSize = cache.getAllocator().getMaxAllocation(),
- partCount = (int)((streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0));
- long partOffset = streamOffset, partEnd = Math.min(partOffset + partSize, streamEnd);
-
- TrackedCacheChunk lastUncompressed = null;
- LlapMemoryBuffer[] singleAlloc = new LlapMemoryBuffer[1];
- for (int i = 0; i < partCount; ++i) {
- long hasEntirePartTo = -1;
- if (partOffset == current.getOffset()) {
- hasEntirePartTo = partOffset;
- // We assume cache chunks would always match the way we read, so check and skip it.
- if (current instanceof TrackedCacheChunk) {
- lastUncompressed = (TrackedCacheChunk)current;
- assert current.getOffset() == partOffset && current.getEnd() == partEnd;
- partOffset = partEnd;
- partEnd = Math.min(partOffset + partSize, streamEnd);
- continue;
- }
- }
- if (current.getOffset() >= partEnd) {
- // We have no data at all for this part of the stream (could be unneeded), skip.
- partOffset = partEnd;
- partEnd = Math.min(partOffset + partSize, streamEnd);
- continue;
- }
- if (toRelease == null && zcr != null) {
- toRelease = new ArrayList<ByteBuffer>();
- }
- // We have some disk buffers... see if we have entire part, etc.
- UncompressedCacheChunk candidateCached = null;
- DiskRangeList next = current;
- while (true) {
- if (next == null || next.getOffset() >= partEnd) {
- if (hasEntirePartTo < partEnd && candidateCached != null) {
- // We are missing a section at the end of the part...
- lastUncompressed = copyAndReplaceCandidateToNonCached(
- candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc);
- candidateCached = null;
- }
- break;
- }
- current = next;
- boolean wasSplit = (current.getEnd() > partEnd);
- if (wasSplit) {
- current = current.split(partEnd);
- }
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Processing uncompressed file data at ["
- + current.getOffset() + ", " + current.getEnd() + ")");
- }
- BufferChunk bc = (BufferChunk)current;
- if (!wasSplit && toRelease != null) {
- toRelease.add(bc.chunk); // TODO: is it valid to give zcr the modified 2nd part?
- }
-
- // Track if we still have the entire part.
- long hadEntirePartTo = hasEntirePartTo;
- if (hasEntirePartTo != -1) {
- hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1;
- }
- if (candidateCached != null && hasEntirePartTo == -1) {
- lastUncompressed = copyAndReplaceCandidateToNonCached(
- candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc);
- candidateCached = null;
- }
-
- if (hasEntirePartTo != -1) {
- // So far we have all the data from the beginning of the part.
- if (candidateCached == null) {
- candidateCached = new UncompressedCacheChunk(bc);
- } else {
- candidateCached.addChunk(bc);
- }
- // We will take care of this at the end of the part, or if we find a gap.
- next = current.next;
- continue;
- }
- // We don't have the entire part; just copy to an allocated buffer. We could try to
- // optimize a bit if we have contiguous buffers with gaps, but it's probably not needed.
- lastUncompressed = copyAndReplaceUncompressedToNonCached(bc, cache, singleAlloc);
- next = lastUncompressed.next;
- }
- if (candidateCached != null) {
- if (toCache == null) {
- toCache = new ArrayList<>(partCount - i);
- }
- toCache.add(candidateCached);
- }
- }
-
- // 3. Allocate the buffers, prepare cache keys.
- if (toCache == null) return lastUncompressed; // Nothing to copy and cache.
-
- LlapMemoryBuffer[] targetBuffers =
- toCache.size() == 1 ? singleAlloc : new LlapMemoryBuffer[toCache.size()];
- targetBuffers[0] = null;
- DiskRange[] cacheKeys = new DiskRange[toCache.size()];
- int ix = 0;
- for (UncompressedCacheChunk chunk : toCache) {
- cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
- ++ix;
- }
- cache.getAllocator().allocateMultiple(
- targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
-
- // 4. Now copy the data into cache buffers.
- ix = 0;
- for (UncompressedCacheChunk candidateCached : toCache) {
- candidateCached.buffer = targetBuffers[ix];
- ByteBuffer dest = candidateCached.buffer.getByteBufferRaw();
- copyAndReplaceUncompressedChunks(candidateCached, dest, candidateCached);
- candidateCached.clear();
- lastUncompressed = candidateCached;
- ++ix;
- }
-
- // 5. Release original compressed buffers to zero-copy reader if needed.
- if (toRelease != null) {
- assert zcr != null;
- for (ByteBuffer buf : toRelease) {
- zcr.releaseBuffer(buf);
- }
- }
-
- // 6. Finally, put uncompressed data to cache.
- long[] collisionMask = cache.putFileData(
- fileId, cacheKeys, targetBuffers, baseOffset, Priority.NORMAL, qfCounters);
- processCacheCollisions(cache, collisionMask, toCache, targetBuffers, null);
-
- return lastUncompressed;
- }
-
- private static void copyUncompressedChunk(ByteBuffer src, ByteBuffer dest) {
- int startPos = dest.position(), startLim = dest.limit();
- dest.put(src); // Copy uncompressed data to cache.
- // Put moves position forward by the size of the data.
- int newPos = dest.position();
- if (newPos > startLim) {
- throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
- + ") became [" + newPos + ", " + dest.limit() + ")");
- }
- dest.position(startPos);
- dest.limit(newPos);
- }
-
-
- private static TrackedCacheChunk copyAndReplaceCandidateToNonCached(
- UncompressedCacheChunk candidateCached, long partOffset,
- long candidateEnd, LowLevelCache cache, LlapMemoryBuffer[] singleAlloc) {
- // We thought we had the entire part to cache, but we don't; convert start to
- // non-cached. Since we are at the first gap, the previous stuff must be contiguous.
- singleAlloc[0] = null;
- cache.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
-
- LlapMemoryBuffer buffer = singleAlloc[0];
- cache.notifyReused(buffer);
- ByteBuffer dest = buffer.getByteBufferRaw();
- TrackedCacheChunk tcc = TCC_POOL.take();
- tcc.init(buffer, partOffset, candidateEnd);
- copyAndReplaceUncompressedChunks(candidateCached, dest, tcc);
- return tcc;
- }
-
- private static TrackedCacheChunk copyAndReplaceUncompressedToNonCached(
- BufferChunk bc, LowLevelCache cache, LlapMemoryBuffer[] singleAlloc) {
- singleAlloc[0] = null;
- cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
- LlapMemoryBuffer buffer = singleAlloc[0];
- cache.notifyReused(buffer);
- ByteBuffer dest = buffer.getByteBufferRaw();
- TrackedCacheChunk tcc = TCC_POOL.take();
- tcc.init(buffer, bc.getOffset(), bc.getEnd());
- copyUncompressedChunk(bc.chunk, dest);
- bc.replaceSelfWith(tcc);
- return tcc;
- }
-
- private static void copyAndReplaceUncompressedChunks(
- UncompressedCacheChunk candidateCached, ByteBuffer dest, TrackedCacheChunk tcc) {
- int startPos = dest.position(), startLim = dest.limit();
- BufferChunk chunk = candidateCached.getChunk();
- for (int i = 0; i < candidateCached.getCount(); ++i) {
- dest.put(chunk.getData());
- BufferChunk next = (BufferChunk)(chunk.next);
- if (i == 0) {
- chunk.replaceSelfWith(tcc);
- } else {
- chunk.removeSelf();
- }
- chunk = next;
- }
- int newPos = dest.position();
- if (newPos > startLim) {
- throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
- + ") became [" + newPos + ", " + dest.limit() + ")");
- }
- dest.position(startPos);
- dest.limit(newPos);
- }
-
- private static void decompressChunk(
- ByteBuffer src, CompressionCodec codec, ByteBuffer dest) throws IOException {
- int startPos = dest.position(), startLim = dest.limit();
- codec.decompress(src, dest);
- // Codec resets the position to 0 and limit to correct limit.
- dest.position(startPos);
- int newLim = dest.limit();
- if (newLim > startLim) {
- throw new AssertionError("After codec, buffer [" + startPos + ", " + startLim
- + ") became [" + dest.position() + ", " + newLim + ")");
- }
- }
-
- public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) {
- while (current != null) {
- if (current instanceof ProcCacheChunk) {
- PCC_POOL.offer((ProcCacheChunk)current);
- } else if (current instanceof TrackedCacheChunk) {
- TCC_POOL.offer((TrackedCacheChunk)current);
- }
- current = current.next;
- }
- }
-
- private static void ponderReleaseInitialRefcount(LowLevelCache cache,
- long unlockUntilCOffset, long streamStartOffset, TrackedCacheChunk cc) {
- if (cc.getEnd() > unlockUntilCOffset) return;
- assert !cc.isReleased;
- releaseInitialRefcount(cache, cc, false);
- // Release all the previous buffers that we may not have been able to release due to reuse.
- DiskRangeList prev = cc.prev;
- while (true) {
- if ((prev == null) || (prev.getEnd() <= streamStartOffset)
- || !(prev instanceof TrackedCacheChunk)) break;
- TrackedCacheChunk prevCc = (TrackedCacheChunk)prev;
- if (prevCc.isReleased) break;
- releaseInitialRefcount(cache, prevCc, true);
- prev = prev.prev;
- }
- }
-
- private static void releaseInitialRefcount(
- LowLevelCache cache, TrackedCacheChunk cc, boolean isBacktracking) {
- // This is the last RG for which this buffer will be used. Remove the initial refcount
- if (DebugUtils.isTraceLockingEnabled()) {
- LOG.info("Unlocking " + cc.buffer + " for the fetching thread"
- + (isBacktracking ? "; backtracking" : ""));
- }
- cache.releaseBuffer(cc.buffer);
- cc.isReleased = true;
- }
-
- private static void processCacheCollisions(LowLevelCache cache, long[] collisionMask,
- List<? extends TrackedCacheChunk> toDecompress, LlapMemoryBuffer[] targetBuffers,
- List<LlapMemoryBuffer> cacheBuffers) {
- if (collisionMask == null) return;
- assert collisionMask.length >= (toDecompress.size() >>> 6);
- // There are some elements that were cached in parallel, take care of them.
- long maskVal = -1;
- for (int i = 0; i < toDecompress.size(); ++i) {
- if ((i & 63) == 0) {
- maskVal = collisionMask[i >>> 6];
- }
- if ((maskVal & 1) == 1) {
- // Cache has found an old buffer for the key and put it into array instead of our new one.
- TrackedCacheChunk replacedChunk = toDecompress.get(i);
- LlapMemoryBuffer replacementBuffer = targetBuffers[i];
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Discarding data due to cache collision: " + replacedChunk.buffer
- + " replaced with " + replacementBuffer);
- }
- assert replacedChunk.buffer != replacementBuffer : i + " was not replaced in the results "
- + "even though mask is [" + Long.toBinaryString(maskVal) + "]";
- replacedChunk.handleCacheCollision(cache, replacementBuffer, cacheBuffers);
- }
- maskVal >>= 1;
- }
- }
-
-
- /** Finds compressed offset in a stream and makes sure iter points to its position.
- This may be necessary for obscure combinations of compression and encoding boundaries. */
- private static DiskRangeList findExactPosition(DiskRangeList ranges, long offset) {
- if (offset < 0) return ranges;
- return findIntersectingPosition(ranges, offset, offset);
- }
-
- private static DiskRangeList findIntersectingPosition(DiskRangeList ranges, long offset, long end) {
- if (offset < 0) return ranges;
- // We expect the offset to be valid TODO: rather, validate
- while (ranges.getEnd() <= offset) {
- ranges = ranges.next;
- }
- while (ranges.getOffset() > end) {
- ranges = ranges.prev;
- }
- // We are now on some intersecting buffer, find the first intersecting buffer.
- while (ranges.prev != null && ranges.prev.getEnd() > offset) {
- ranges = ranges.prev;
- }
- return ranges;
- }
-
- private static class IncompleteCb extends DiskRangeList {
- public IncompleteCb(long offset, long end) {
- super(offset, end);
- }
-
- @Override
- public String toString() {
- return "incomplete CB start: " + offset + " end: " + end;
- }
- }
-
- /**
- * Reads one compression block from the source; handles compression blocks read from
- * multiple ranges (usually, that would only happen with zcr).
- * Adds stuff to cachedBuffers, toDecompress and toRelease (see below what each does).
- * @param current BufferChunk where compression block starts.
- * @param ranges Iterator of all chunks, pointing at current.
- * @param cacheBuffers The result buffer array to add pre-allocated target cache buffer.
- * @param toDecompress The list of work to decompress - pairs of compressed buffers and the
- * target buffers (same as the ones added to cacheBuffers).
- * @param toRelease The list of buffers to release to zcr because they are no longer in use.
- * @return The resulting cache chunk.
- */
- private static ProcCacheChunk addOneCompressionBuffer(BufferChunk current, ZeroCopyReaderShim zcr,
- int bufferSize, LowLevelCache cache, List<LlapMemoryBuffer> cacheBuffers,
- List<ProcCacheChunk> toDecompress, List<ByteBuffer> toRelease) throws IOException {
- ByteBuffer slice = null;
- ByteBuffer compressed = current.chunk;
- long cbStartOffset = current.getOffset();
- int b0 = compressed.get() & 0xff;
- int b1 = compressed.get() & 0xff;
- int b2 = compressed.get() & 0xff;
- int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
- if (chunkLength > bufferSize) {
- throw new IllegalArgumentException("Buffer size too small. size = " +
- bufferSize + " needed = " + chunkLength);
- }
- int consumedLength = chunkLength + OutStream.HEADER_SIZE;
- long cbEndOffset = cbStartOffset + consumedLength;
- boolean isUncompressed = ((b0 & 0x01) == 1);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Found CB at " + cbStartOffset + ", chunk length " + chunkLength + ", total "
- + consumedLength + ", " + (isUncompressed ? "not " : "") + "compressed");
- }
- if (compressed.remaining() >= chunkLength) {
- // Simple case - CB fits entirely in the disk range.
- slice = compressed.slice();
- slice.limit(chunkLength);
- ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed, cbStartOffset,
- cbEndOffset, chunkLength, current, cache, toDecompress, cacheBuffers);
- if (compressed.remaining() <= 0 && zcr != null) {
- toRelease.add(compressed);
- }
- return cc;
- }
- if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
- addIncompleteCompressionBuffer(cbStartOffset, current, 0);
- return null; // This is impossible to read from this chunk.
- }
-
- // TODO: we could remove extra copy for isUncompressed case by copying directly to cache.
- // We need to consolidate 2 or more buffers into one to decompress.
- ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
- int remaining = chunkLength - compressed.remaining();
- int originalPos = compressed.position();
- copy.put(compressed);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Removing partial CB " + current + " from ranges after copying its contents");
- }
- DiskRangeList next = current.next;
- current.removeSelf();
- if (zcr != null) {
- if (originalPos == 0) {
- zcr.releaseBuffer(compressed); // We copied the entire buffer.
- } else {
- toRelease.add(compressed); // There might be slices depending on this buffer.
- }
- }
-
- int extraChunkCount = 0;
- while (true) {
- if (!(next instanceof BufferChunk)) {
- throw new IOException("Trying to extend compressed block into uncompressed block " + next);
- }
- compressed = next.getData();
- ++extraChunkCount;
- if (compressed.remaining() >= remaining) {
- // This is the last range for this compression block. Yay!
- slice = compressed.slice();
- slice.limit(remaining);
- copy.put(slice);
- ProcCacheChunk cc = addOneCompressionBlockByteBuffer(
- copy, isUncompressed, cbStartOffset, cbEndOffset, remaining,
- (BufferChunk)next, cache, toDecompress, cacheBuffers);
- if (compressed.remaining() <= 0 && zcr != null) {
- zcr.releaseBuffer(compressed); // We copied the entire buffer.
- }
- return cc;
- }
- remaining -= compressed.remaining();
- copy.put(compressed);
- if (zcr != null) {
- zcr.releaseBuffer(compressed); // We copied the entire buffer.
- }
- DiskRangeList tmp = next;
- next = next.hasContiguousNext() ? next.next : null;
- if (next != null) {
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Removing partial CB " + tmp + " from ranges after copying its contents");
- }
- tmp.removeSelf();
- } else {
- addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount);
- return null; // This is impossible to read from this chunk.
- }
- }
- }
-
- private static void addIncompleteCompressionBuffer(
- long cbStartOffset, DiskRangeList target, int extraChunkCount) {
- IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with "
- + icb + " in the buffers");
- }
- target.replaceSelfWith(icb);
- }
-
- /**
- * Add one buffer with compressed data the results for addOneCompressionBuffer (see javadoc).
- * @param fullCompressionBlock (fCB) Entire compression block, sliced or copied from disk data.
- * @param isUncompressed Whether the data in the block is uncompressed.
- * @param cbStartOffset Compressed start offset of the fCB.
- * @param cbEndOffset Compressed end offset of the fCB.
- * @param lastRange The buffer from which the last (or all) bytes of fCB come.
- * @param lastChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock.
- * @param ranges The iterator of all compressed ranges for the stream, pointing at lastRange.
- * @param lastChunk
- * @param toDecompress See addOneCompressionBuffer.
- * @param cacheBuffers See addOneCompressionBuffer.
- * @return New cache buffer.
- */
- private static ProcCacheChunk addOneCompressionBlockByteBuffer(ByteBuffer fullCompressionBlock,
- boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastChunkLength,
- BufferChunk lastChunk, LowLevelCache cache, List<ProcCacheChunk> toDecompress,
- List<LlapMemoryBuffer> cacheBuffers) {
- // Prepare future cache buffer.
- LlapMemoryBuffer futureAlloc = cache.createUnallocated();
- // Add it to result in order we are processing.
- cacheBuffers.add(futureAlloc);
- // Add it to the list of work to decompress.
- ProcCacheChunk cc = PCC_POOL.take();
- cc.init(cbStartOffset, cbEndOffset, !isUncompressed,
- fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1);
- toDecompress.add(cc);
- // Adjust the compression block position.
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Adjusting " + lastChunk + " to consume " + lastChunkLength + " compressed bytes");
- }
- lastChunk.chunk.position(lastChunk.chunk.position() + lastChunkLength);
- // Finally, put it in the ranges list for future use (if shared between RGs).
- // Before anyone else accesses it, it would have been allocated and decompressed locally.
- if (lastChunk.chunk.remaining() <= 0) {
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers");
- }
- lastChunk.replaceSelfWith(cc);
- } else {
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Adding " + cc + " before " + lastChunk + " in the buffers");
- }
- lastChunk.insertPartBefore(cc);
+ return new CompressedStream(fileId, name, input, length, codec, bufferSize);
}
- return cc;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java
index d08ad1f..8b88e05 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReaderImpl.java
@@ -83,12 +83,12 @@ public class MetadataReaderImpl implements MetadataReader {
file.readFully(buffer);
indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create(null, "index",
Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
- codec, bufferSize, null));
+ codec, bufferSize));
if (readBloomFilter) {
bb.position((int) stream.getLength());
bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
null, "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
- nextStream.getLength(), codec, bufferSize, null));
+ nextStream.getLength(), codec, bufferSize));
}
}
}
@@ -109,7 +109,7 @@ public class MetadataReaderImpl implements MetadataReader {
file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
return OrcProto.StripeFooter.parseFrom(InStream.create(null, "footer",
Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
- tailLength, codec, bufferSize, null));
+ tailLength, codec, bufferSize));
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index 1eb0dec..a92b455 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -22,11 +22,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
-import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache;
+import org.apache.hadoop.hive.common.io.storage_api.DataReader;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -318,9 +316,8 @@ public interface Reader {
MetadataReader metadata() throws IOException;
- EncodedReader encodedReader(long fileId, LowLevelCache lowLevelCache,
- LowLevelCacheCounters qfCounters, Consumer<OrcEncodedColumnBatch> consumer)
- throws IOException;
+ EncodedReader encodedReader(
+ long fileId, DataCache dataCache, DataReader dataReader) throws IOException;
List<Integer> getVersionList();
@@ -331,4 +328,6 @@ public interface Reader {
List<StripeStatistics> getStripeStatistics();
List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics();
+
+ DataReader createDefaultDataReader(boolean useZeroCopy);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 7d14e04..36d8e0e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -34,14 +34,10 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache;
+import org.apache.hadoop.hive.common.io.storage_api.DataReader;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
-import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.Footer;
import org.apache.hadoop.hive.ql.io.FileFormatException;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
@@ -505,13 +501,13 @@ public class ReaderImpl implements Reader {
footerBuffer.limit(position + metadataSize);
InputStream instream = InStream.create(null, "metadata", Lists.<DiskRange>newArrayList(
- new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize, null);
+ new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize);
this.metadata = OrcProto.Metadata.parseFrom(instream);
footerBuffer.position(position + metadataSize);
footerBuffer.limit(position + metadataSize + footerBufferSize);
instream = InStream.create(null, "footer", Lists.<DiskRange>newArrayList(
- new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize, null);
+ new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize);
this.footer = OrcProto.Footer.parseFrom(instream);
footerBuffer.position(position);
@@ -722,13 +718,10 @@ public class ReaderImpl implements Reader {
}
@Override
- public EncodedReader encodedReader(long fileId, LowLevelCache lowLevelCache,
- LowLevelCacheCounters qfCounters, Consumer<OrcEncodedColumnBatch> consumer)
- throws IOException {
- boolean useZeroCopy = (conf != null)
- && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_ZEROCOPY);
- return new EncodedReaderImpl(fileSystem, path, fileId, useZeroCopy, types,
- codec, bufferSize, rowIndexStride, lowLevelCache, qfCounters, consumer);
+ public EncodedReader encodedReader(
+ long fileId, DataCache dataCache, DataReader dataReader) throws IOException {
+ return new EncodedReaderImpl(fileId, types,
+ codec, bufferSize, rowIndexStride, dataCache, dataReader);
}
@Override
@@ -740,4 +733,9 @@ public class ReaderImpl implements Reader {
public int getMetadataSize() {
return metadataSize;
}
+
+ @Override
+ public DataReader createDefaultDataReader(boolean useZeroCopy) {
+ return RecordReaderUtils.createDefaultDataReader(fileSystem, path, useZeroCopy, codec);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
index eeb2f7d..dba9071 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
@@ -18,16 +18,8 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
-import java.util.List;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
/**
* A row-by-row iterator for ORC files.
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 3b98562..5117baf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -34,35 +34,30 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.common.DiskRangeList;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
+import org.apache.hadoop.hive.common.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.io.storage_api.DataReader;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
import org.apache.hadoop.io.Text;
public class RecordReaderImpl implements RecordReader {
static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
private final Path path;
- private final FileSystem fileSystem;
- private final FSDataInputStream file;
private final long firstRow;
private final List<StripeInformation> stripes =
new ArrayList<StripeInformation>();
@@ -88,9 +83,7 @@ public class RecordReaderImpl implements RecordReader {
private boolean[] includedRowGroups = null;
private final Configuration conf;
private final MetadataReader metadata;
-
- private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool();
- private final ZeroCopyReaderShim zcr;
+ private final DataReader dataReader;
public final static class Index {
OrcProto.RowIndex[] rowGroupIndex;
@@ -163,9 +156,7 @@ public class RecordReaderImpl implements RecordReader {
long strideRate,
Configuration conf
) throws IOException {
- this.fileSystem = fileSystem;
this.path = path;
- this.file = fileSystem.open(path);
this.codec = codec;
this.types = types;
this.bufferSize = bufferSize;
@@ -194,9 +185,10 @@ public class RecordReaderImpl implements RecordReader {
}
}
- final boolean zeroCopy = (conf != null)
- && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY));
- zcr = zeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null;
+ final boolean zeroCopy = (conf != null) && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY));
+ // TODO: we could change the ctor to pass this externally
+ this.dataReader = RecordReaderUtils.createDefaultDataReader(fileSystem, path, zeroCopy, codec);
+ this.dataReader.open();
firstRow = skippedRows;
totalRowCount = rows;
@@ -766,16 +758,16 @@ public class RecordReaderImpl implements RecordReader {
is.close();
}
if (bufferChunks != null) {
- if (zcr != null) {
+ if (dataReader.isTrackingDiskRanges()) {
for (DiskRangeList range = bufferChunks; range != null; range = range.next) {
if (!(range instanceof BufferChunk)) {
continue;
}
- zcr.releaseBuffer(((BufferChunk) range).chunk);
+ dataReader.releaseBuffer(((BufferChunk) range).chunk);
}
}
- bufferChunks = null;
}
+ bufferChunks = null;
streams.clear();
}
@@ -835,10 +827,9 @@ public class RecordReaderImpl implements RecordReader {
long end = start + stripe.getDataLength();
// explicitly trigger 1 big read
DiskRangeList toRead = new DiskRangeList(start, end);
- bufferChunks = RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), toRead, false);
+ bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
- // TODO: decompressed data from streams should be put in cache
}
/**
@@ -891,37 +882,6 @@ public class RecordReaderImpl implements RecordReader {
}
}
- public static class CacheChunk extends DiskRangeList {
- public LlapMemoryBuffer buffer;
-
- public CacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
- super(offset, end);
- this.buffer = buffer;
- }
-
- @Override
- public boolean hasData() {
- return buffer != null;
- }
-
- @Override
- public ByteBuffer getData() {
- // Callers duplicate the buffer, they have to for BufferChunk
- return buffer.getByteBufferRaw();
- }
-
- @Override
- public String toString() {
- return "start: " + offset + " end: " + end + " cache buffer: " + buffer;
- }
-
- @Override
- public DiskRange sliceAndShift(long offset, long end, long shiftBy) {
- throw new UnsupportedOperationException("Cache chunk cannot be sliced - attempted ["
- + this.offset + ", " + this.end + ") to [" + offset + ", " + end + ") ");
- }
- }
-
/**
* Plan the ranges of the file that we need to read given the list of
* columns and row groups.
@@ -949,7 +909,7 @@ public class RecordReaderImpl implements RecordReader {
long offset = 0;
// figure out which columns have a present stream
boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
- DiskRangeListCreateHelper list = new DiskRangeListCreateHelper();
+ CreateHelper list = new CreateHelper();
for (OrcProto.Stream stream : streamList) {
long length = stream.getLength();
int column = stream.getColumn();
@@ -992,7 +952,7 @@ public class RecordReaderImpl implements RecordReader {
ranges, streamOffset, streamDesc.getLength());
StreamName name = new StreamName(column, streamDesc.getKind());
streams.put(name, InStream.create(null, name.toString(), buffers,
- streamDesc.getLength(), codec, bufferSize, null));
+ streamDesc.getLength(), codec, bufferSize));
streamOffset += streamDesc.getLength();
}
}
@@ -1005,7 +965,7 @@ public class RecordReaderImpl implements RecordReader {
if (LOG.isDebugEnabled()) {
LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
}
- bufferChunks = RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), toRead, false);
+ bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
if (LOG.isDebugEnabled()) {
LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
}
@@ -1162,8 +1122,7 @@ public class RecordReaderImpl implements RecordReader {
@Override
public void close() throws IOException {
clearStreams();
- pool.clear();
- file.close();
+ dataReader.close();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
index 429c293e..2a57916 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
@@ -29,12 +29,11 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.common.DiskRangeList;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
+import org.apache.hadoop.hive.common.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.DiskRangeList.MutateHelper;
+import org.apache.hadoop.hive.common.io.storage_api.DataReader;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -48,6 +47,69 @@ import com.google.common.collect.ComparisonChain;
*/
public class RecordReaderUtils {
private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
+
+ private static class DefaultDataReader implements DataReader {
+ private FSDataInputStream file;
+ private ByteBufferAllocatorPool pool;
+ private ZeroCopyReaderShim zcr;
+ private FileSystem fs;
+ private Path path;
+ private boolean useZeroCopy;
+ private CompressionCodec codec;
+
+ public DefaultDataReader(
+ FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) {
+ this.fs = fs;
+ this.path = path;
+ this.useZeroCopy = useZeroCopy;
+ this.codec = codec;
+ }
+
+ @Override
+ public void open() throws IOException {
+ this.file = fs.open(path);
+ if (useZeroCopy) {
+ pool = new ByteBufferAllocatorPool();
+ zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
+ } else {
+ pool = null;
+ zcr = null;
+ }
+ }
+
+ @Override
+ public DiskRangeList readFileData(
+ DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
+ return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (file != null) {
+ file.close();
+ }
+ if (pool != null) {
+ pool.clear();
+ }
+ }
+
+ @Override
+ public boolean isTrackingDiskRanges() {
+ return zcr != null;
+ }
+
+ @Override
+ public void releaseBuffer(ByteBuffer buffer) {
+ zcr.releaseBuffer(buffer);
+ }
+
+ }
+
+ static DataReader createDefaultDataReader(
+ FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) {
+ return new DefaultDataReader(fs, path, useZeroCopy, codec);
+ }
+
static boolean[] findPresentStreamsByColumn(
List<OrcProto.Stream> streamList, List<OrcProto.Type> types) {
boolean[] hasNull = new boolean[types.size()];
@@ -75,14 +137,14 @@ public class RecordReaderUtils {
}
static void addEntireStreamToRanges(
- long offset, long length, DiskRangeListCreateHelper list, boolean doMergeBuffers) {
+ long offset, long length, CreateHelper list, boolean doMergeBuffers) {
list.addOrMerge(offset, offset + length, doMergeBuffers, false);
}
static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
- long offset, long length, DiskRangeListCreateHelper list, boolean doMergeBuffers) {
+ long offset, long length, CreateHelper list, boolean doMergeBuffers) {
for (int group = 0; group < includedRowGroups.length; ++group) {
if (!includedRowGroups[group]) continue;
int posn = getIndexPosition(
@@ -204,7 +266,7 @@ public class RecordReaderUtils {
* @param ranges ranges to stringify
* @return the resulting string
*/
- static String stringifyDiskRanges(DiskRangeList range) {
+ public static String stringifyDiskRanges(DiskRangeList range) {
StringBuilder buffer = new StringBuilder();
buffer.append("[");
boolean isFirst = true;
@@ -240,7 +302,7 @@ public class RecordReaderUtils {
if (range == null) return null;
DiskRangeList prev = range.prev;
if (prev == null) {
- prev = new DiskRangeListMutateHelper(range);
+ prev = new MutateHelper(range);
}
while (range != null) {
if (range.hasData()) {
@@ -434,31 +496,4 @@ public class RecordReaderUtils {
}
}
}
-
- public static long getFileId(FileSystem fileSystem, Path path) throws IOException {
- String pathStr = path.toUri().getPath();
- if (fileSystem instanceof DistributedFileSystem) {
- return SHIMS.getFileId(fileSystem, pathStr);
- }
- // If we are not on DFS, we just hash the file name + size and hope for the best.
- // TODO: we assume it only happens in tests. Fix?
- int nameHash = pathStr.hashCode();
- long fileSize = fileSystem.getFileStatus(path).getLen();
- long id = ((fileSize ^ (fileSize >>> 32)) << 32) | ((long)nameHash & 0xffffffffL);
- RecordReaderImpl.LOG.warn("Cannot get unique file ID from "
- + fileSystem.getClass().getSimpleName() + "; using " + id + "(" + pathStr
- + "," + nameHash + "," + fileSize + ")");
- return id;
- }
-
- // TODO: this relies on HDFS not changing the format; we assume if we could get inode ID, this
- // is still going to work. Otherwise, file IDs can be turned off. Later, we should use
- // as public utility method in HDFS to obtain the inode-based path.
- private static String HDFS_ID_PATH_PREFIX = "/.reserved/.inodes/";
-
- public static Path getFileIdPath(
- FileSystem fileSystem, Path path, long fileId) {
- return (fileSystem instanceof DistributedFileSystem)
- ? new Path(HDFS_ID_PATH_PREFIX + fileId) : path;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamUtils.java
index 26f4c01..4c884be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamUtils.java
@@ -21,8 +21,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hive.common.DiskRangeInfo;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
/**
* Stream utility.
@@ -39,7 +39,7 @@ public class StreamUtils {
* @throws IOException
*/
public static SettableUncompressedStream createSettableUncompressedStream(String streamName,
- Long fileId, EncodedColumnBatch.StreamBuffer streamBuffer) throws IOException {
+ Long fileId, ColumnStreamData streamBuffer) throws IOException {
if (streamBuffer == null) {
return null;
}
@@ -54,11 +54,11 @@ public class StreamUtils {
* @param streamBuffer - stream buffer
* @return - total length of disk ranges
*/
- public static DiskRangeInfo createDiskRangeInfo(EncodedColumnBatch.StreamBuffer streamBuffer) {
- DiskRangeInfo diskRangeInfo = new DiskRangeInfo(streamBuffer.indexBaseOffset);
+ public static DiskRangeInfo createDiskRangeInfo(ColumnStreamData streamBuffer) {
+ DiskRangeInfo diskRangeInfo = new DiskRangeInfo(streamBuffer.getIndexBaseOffset());
long offset = diskRangeInfo.getTotalLength(); // See ctor comment.
// TODO: we should get rid of this
- for (LlapMemoryBuffer memoryBuffer : streamBuffer.cacheBuffers) {
+ for (MemoryBuffer memoryBuffer : streamBuffer.getCacheBuffers()) {
ByteBuffer buffer = memoryBuffer.getByteBufferDup();
diskRangeInfo.addDiskRange(new RecordReaderImpl.BufferChunk(buffer, offset));
offset += buffer.remaining();
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index 9ff465d..e08b446 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -30,8 +30,8 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -65,11 +65,11 @@ public class TreeReaderFactory {
protected final int columnId;
protected BitFieldReader present = null;
protected boolean valuePresent = false;
- protected EncodedColumnBatch.StreamBuffer presentStreamBuffer = null;
- protected EncodedColumnBatch.StreamBuffer dataStreamBuffer = null;
- protected EncodedColumnBatch.StreamBuffer dictionaryStreamBuffer = null;
- protected EncodedColumnBatch.StreamBuffer lengthsStreamBuffer = null;
- protected EncodedColumnBatch.StreamBuffer secondaryStreamBuffer = null;
+ protected ColumnStreamData presentStreamBuffer = null;
+ protected ColumnStreamData dataStreamBuffer = null;
+ protected ColumnStreamData dictionaryStreamBuffer = null;
+ protected ColumnStreamData lengthsStreamBuffer = null;
+ protected ColumnStreamData secondaryStreamBuffer = null;
TreeReader(int columnId) throws IOException {
this(columnId, null);
@@ -137,11 +137,11 @@ public class TreeReaderFactory {
}
}
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
// stream buffers are arranged in enum order of stream kind
- for (EncodedColumnBatch.StreamBuffer streamBuffer : buffers) {
- switch (streamBuffer.streamKind) {
+ for (ColumnStreamData streamBuffer : buffers) {
+ switch (streamBuffer.getStreamKind()) {
case 0:
// PRESENT stream
presentStreamBuffer = streamBuffer;
@@ -163,7 +163,7 @@ public class TreeReaderFactory {
secondaryStreamBuffer = streamBuffer;
break;
default:
- throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind);
+ throw new IOException("Unexpected stream kind: " + streamBuffer.getStreamKind());
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/Consumer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/Consumer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/Consumer.java
new file mode 100644
index 0000000..ce4328f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/Consumer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc.llap;
+
+/**
+ * Data consumer; an equivalent of a data queue for an asynchronous data producer.
+ */
+public interface Consumer<T> {
+ /** Some data has been produced. */
+ public void consumeData(T data);
+ /** No more data will be produced; done */
+ public void setDone();
+ /** No more data will be produced; error during production */
+ public void setError(Throwable t);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcBatchKey.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcBatchKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcBatchKey.java
new file mode 100644
index 0000000..0249168
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcBatchKey.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc.llap;
+
+public class OrcBatchKey {
+ public long file;
+ public int stripeIx, rgIx;
+
+ public OrcBatchKey(long file, int stripeIx, int rgIx) {
+ set(file, stripeIx, rgIx);
+ }
+
+ public void set(long file, int stripeIx, int rgIx) {
+ this.file = file;
+ this.stripeIx = stripeIx;
+ this.rgIx = rgIx;
+ }
+
+ @Override
+ public String toString() {
+ return "[" + file + ", stripe " + stripeIx + ", rgIx " + rgIx + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = prime + (int)(file ^ (file >>> 32));
+ return (prime * result + rgIx) * prime + stripeIx;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof OrcBatchKey)) return false;
+ OrcBatchKey other = (OrcBatchKey)obj;
+ // Strings are interned and can thus be compared like this.
+ return stripeIx == other.stripeIx && rgIx == other.rgIx && file == other.file;
+ }
+
+ @Override
+ public OrcBatchKey clone() throws CloneNotSupportedException {
+ return new OrcBatchKey(file, stripeIx, rgIx);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcCacheKey.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcCacheKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcCacheKey.java
new file mode 100644
index 0000000..2be70e5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/llap/OrcCacheKey.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc.llap;
+
+public class OrcCacheKey extends OrcBatchKey {
+ public int colIx;
+
+ public OrcCacheKey(long file, int stripeIx, int rgIx, int colIx) {
+ super(file, stripeIx, rgIx);
+ this.colIx = colIx;
+ }
+
+ public OrcCacheKey(OrcBatchKey batchKey, int colIx) {
+ super(batchKey.file, batchKey.stripeIx, batchKey.rgIx);
+ this.colIx = colIx;
+ }
+
+ public OrcBatchKey copyToPureBatchKey() {
+ return new OrcBatchKey(file, stripeIx, rgIx);
+ }
+
+ @Override
+ public String toString() {
+ return "[" + file + ", stripe " + stripeIx + ", rgIx " + rgIx + ", rgIx " + colIx + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ return super.hashCode() * prime + colIx;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof OrcCacheKey)) return false;
+ OrcCacheKey other = (OrcCacheKey)obj;
+ // Strings are interned and can thus be compared like this.
+ return stripeIx == other.stripeIx && rgIx == other.rgIx
+ && file == other.file && other.colIx == colIx;
+ }
+}
\ No newline at end of file
[2/4] hive git commit: HIVE-11259 : LLAP: clean up ORC dependencies
part 1 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
index e66955d..e9c32ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -26,58 +28,60 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.common.DiskRangeList;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
-import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.CacheListHelper;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
-import org.apache.hadoop.hive.ql.io.orc.InStream.TrackedCacheChunk;
+import org.apache.hadoop.hive.common.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache;
+import org.apache.hadoop.hive.common.io.storage_api.DataReader;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
import org.apache.hive.common.util.FixedSizedObjectPool;
import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Encoded reader implementation.
*
* Note about refcounts on cache blocks.
- * When we get or put blocks into cache, they are "locked" (refcount++). After that, we send the
- * blocks out to processor as part of RG data; one block can be used for multiple RGs. In some
- * cases, one block is sent for ALL rgs (e.g. a dictionary for string column). This is how we deal
- * with this:
- * For non-dictionary case:
- * 1) At all times, every buffer has +1 refcount for each time we sent this block to processing.
- * 2) When processor is done with an RG, it decrefs for all the blocks involved.
- * 3) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we send
- * the block to processor, and the latter decrefs it, the block won't be evicted when we want
- * to reuse it for some other RG, forcing us to do an extra disk read or cache lookup.
- * 4) As we read (we always read RGs in order, assume they are stored in physical order in the
- * file, plus that CBs are not shared between streams, AND that we read each stream from the
- * beginning), we note which blocks cannot possibly be reused anymore (next RG starts in the
- * next CB). We decref for the refcount from (3) in such case.
- * 5) Given that RG end boundary in ORC is an estimate, so we can request data from cache and then
- * not use it, at the end we go thru all the blocks, and release those not released by (4).
+ * When we get or put blocks into cache, they are "locked" (refcount++), so they cannot be evicted.
+ * We send the MemoryBuffer-s to caller as part of RG data; one MemoryBuffer can be used for many
+ * RGs (e.g. a dictionary, or multiple RGs per block). Also, we want to "unlock" MemoryBuffer-s in
+ * cache as soon as possible. This is how we deal with this:
+ *
* For dictionary case:
- * 1) We have a separate refcount on the ColumnBuffer object we send to the processor (in the
- * non-dictionary case, it's always 1, so when processor is done it goes directly to decrefing
- * cache buffers).
- * 2) In the dictionary case, it's increased per RG, and processors don't touch cache buffers if
- * they do not happen to decref this refcount to 0.
- * 3) This is done because dictionary can have many buffers; decrefing all of them for all RGs
- * is more expensive; plus, decrefing in cache may be more expensive due to cache policy/etc.
+ * 1) There's a separate refcount on the StreamBuffer object we send to the caller. In the
+ * dictionary case, it's increased per RG, and callers don't release MBs if the containing
+ * StreamBuffer is not ready to be released. This is done because dictionary can have many
+ * buffers; decrefing all of them for all RGs is more expensive; plus, decrefing in cache
+ * may be more expensive due to cache policy/etc.
+ *
+ * For non-dictionary case:
+ * 1) All the StreamBuffer-s for normal data always have refcount 1, because we return then once.
+ * 2) At all times, every MB in such cases has +1 refcount for each time we return it as part of SB.
+ * 3) When caller is done, it therefore decrefs SB to 0, and decrefs all the MBs involved.
+ * 4) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we return
+ * the MB to caller, and he decrefs it, the MB can't be evicted and will be there if we want to
+ * reuse it for some other RG.
+ * 5) As we read (we always read RGs in order and forward in each stream; we assume they are stored
+ * physically in order in the file; AND that CBs are not shared between streams), we note which
+ * MBs cannot possibly be reused anymore (next RG starts in the next CB). We decref the refcount
+ * from (4) in such case.
+ * 6) Given that RG end boundaries in ORC are estimates, we can request data from cache and then
+ * not use it; thus, at the end we go thru all the MBs, and release those not released by (5).
*/
public class EncodedReaderImpl implements EncodedReader {
public static final Log LOG = LogFactory.getLog(EncodedReaderImpl.class);
@@ -114,52 +118,67 @@ public class EncodedReaderImpl implements EncodedReader {
t.reset();
}
});
- public static final FixedSizedObjectPool<StreamBuffer> SB_POOL =
- new FixedSizedObjectPool<>(8192, new PoolObjectHelper<StreamBuffer>() {
+ public static final FixedSizedObjectPool<ColumnStreamData> SB_POOL =
+ new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
+ @Override
+ protected ColumnStreamData create() {
+ return new ColumnStreamData();
+ }
+ @Override
+ protected void resetBeforeOffer(ColumnStreamData t) {
+ t.reset();
+ }
+ });
+ private static final FixedSizedObjectPool<CacheChunk> TCC_POOL =
+ new FixedSizedObjectPool<>(1024, new PoolObjectHelper<CacheChunk>() {
+ @Override
+ protected CacheChunk create() {
+ return new CacheChunk();
+ }
+ @Override
+ protected void resetBeforeOffer(CacheChunk t) {
+ t.reset();
+ }
+ });
+ private static final FixedSizedObjectPool<ProcCacheChunk> PCC_POOL =
+ new FixedSizedObjectPool<>(1024, new PoolObjectHelper<ProcCacheChunk>() {
@Override
- protected StreamBuffer create() {
- return new StreamBuffer();
+ protected ProcCacheChunk create() {
+ return new ProcCacheChunk();
}
@Override
- protected void resetBeforeOffer(StreamBuffer t) {
+ protected void resetBeforeOffer(ProcCacheChunk t) {
t.reset();
}
});
+ private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
+ @Override
+ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
+ CacheChunk tcc = TCC_POOL.take();
+ tcc.init(buffer, offset, end);
+ return tcc;
+ }
+ };
private final long fileId;
- private FSDataInputStream file;
+ private final DataReader dataReader;
+ private boolean isDataReaderOpen = false;
private final CompressionCodec codec;
private final int bufferSize;
private final List<OrcProto.Type> types;
- private ZeroCopyReaderShim zcr;
private final long rowIndexStride;
- private final LowLevelCache cache;
+ private final DataCache cache;
private ByteBufferAllocatorPool pool;
- // For now, one consumer for all calls.
- private final Consumer<OrcEncodedColumnBatch> consumer;
- private final LowLevelCacheCounters qfCounters;
- private final FileSystem fs;
- private final Path path;
- private final boolean useZeroCopy;
-
- public EncodedReaderImpl(FileSystem fileSystem, Path path, long fileId, boolean useZeroCopy,
- List<OrcProto.Type> types, CompressionCodec codec, int bufferSize, long strideRate,
- LowLevelCache cache, LowLevelCacheCounters qfCounters,
- Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
+ private boolean isDebugTracingEnabled;
+
+ public EncodedReaderImpl(long fileId, List<OrcProto.Type> types, CompressionCodec codec,
+ int bufferSize, long strideRate, DataCache cache, DataReader dataReader) throws IOException {
this.fileId = fileId;
- this.fs = fileSystem;
- this.path = path;
this.codec = codec;
this.types = types;
this.bufferSize = bufferSize;
this.rowIndexStride = strideRate;
this.cache = cache;
- this.qfCounters = qfCounters;
- this.consumer = consumer;
- this.useZeroCopy = useZeroCopy;
- if (useZeroCopy && !cache.getAllocator().isDirectAlloc()) {
- throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache "
- + "buffers; either disable zero-copy or enable direct cache allocation");
- }
+ this.dataReader = dataReader;
}
/** Helper context for each column being read */
@@ -228,7 +247,7 @@ public class EncodedReaderImpl implements EncodedReader {
/** Iterators for the buffers; used to maintain position in per-rg reading. */
DiskRangeList bufferIter;
/** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
- StreamBuffer stripeLevelStream;
+ ColumnStreamData stripeLevelStream;
@Override
public String toString() {
@@ -249,17 +268,15 @@ public class EncodedReaderImpl implements EncodedReader {
} else {
batchKey.set(fileId, stripeIx, rgIx);
}
- if (columnIxs == null || columnCount != columnIxs.length) {
- columnIxs = new int[columnCount];
- columnData = new StreamBuffer[columnCount][];
- }
+ resetColumnArrays(columnCount);
}
}
@Override
public void readEncodedColumns(int stripeIx, StripeInformation stripe,
RowIndex[] indexes, List<ColumnEncoding> encodings, List<Stream> streamList,
- boolean[] included, boolean[][] colRgs) throws IOException {
+ boolean[] included, boolean[][] colRgs,
+ Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
// Note: for now we don't have to setError here, caller will setError if we throw.
// We are also not supposed to call setDone, since we are only part of the operation.
long stripeOffset = stripe.getOffset();
@@ -267,8 +284,8 @@ public class EncodedReaderImpl implements EncodedReader {
long offset = 0; // Stream offset in relation to the stripe.
// 1.1. Figure out which columns have a present stream
boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("The following columns have PRESENT streams: " + DebugUtils.toString(hasNull));
+ if (isDebugTracingEnabled) {
+ LOG.info("The following columns have PRESENT streams: " + arrayToString(hasNull));
}
// We assume stream list is sorted by column and that non-data
@@ -278,7 +295,7 @@ public class EncodedReaderImpl implements EncodedReader {
ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
boolean[] includedRgs = null;
boolean isCompressed = (codec != null);
- DiskRangeListCreateHelper listToRead = new DiskRangeListCreateHelper();
+ CreateHelper listToRead = new CreateHelper();
boolean hasIndexOnlyCols = false;
for (OrcProto.Stream stream : streamList) {
long length = stream.getLength();
@@ -288,7 +305,7 @@ public class EncodedReaderImpl implements EncodedReader {
// We have a stream for included column, but in future it might have no data streams.
// It's more like "has at least one column included that has an index stream".
hasIndexOnlyCols = hasIndexOnlyCols | included[colIx];
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Skipping stream: " + streamKind + " at " + offset + ", " + length);
}
offset += length;
@@ -302,7 +319,7 @@ public class EncodedReaderImpl implements EncodedReader {
includedRgs = colRgs[colRgIx];
ctx = colCtxs[colRgIx] = COLCTX_POOL.take();
ctx.init(colIx, encodings.get(colIx), indexes[colIx]);
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString());
}
} else {
@@ -312,13 +329,13 @@ public class EncodedReaderImpl implements EncodedReader {
int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(),
types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]);
ctx.addStream(offset, stream, indexIx);
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Adding stream for column " + colIx + ": " + streamKind + " at " + offset
+ ", " + length + ", index position " + indexIx);
}
if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) {
RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead, true);
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Will read whole stream " + streamKind + "; added to " + listToRead.getTail());
}
} else {
@@ -344,35 +361,23 @@ public class EncodedReaderImpl implements EncodedReader {
}
// 2. Now, read all of the ranges from cache or disk.
- CacheListHelper toRead = new CacheListHelper(listToRead.get());
- if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
- && LOG.isInfoEnabled()) {
+ DiskRangeList.MutateHelper toRead = new DiskRangeList.MutateHelper(listToRead.get());
+ if (isDebugTracingEnabled && LOG.isInfoEnabled()) {
LOG.info("Resulting disk ranges to read (file " + fileId + "): "
+ RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
- cache.getFileData(fileId, toRead.next, stripeOffset, InStream.CC_FACTORY, qfCounters);
- if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
- && LOG.isInfoEnabled()) {
+ BooleanRef result = new BooleanRef();
+ cache.getFileData(fileId, toRead.next, stripeOffset, CC_FACTORY, result);
+ if (isDebugTracingEnabled && LOG.isInfoEnabled()) {
LOG.info("Disk ranges after cache (file " + fileId + ", base offset " + stripeOffset
+ "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
- if (!toRead.didGetAllData) {
- long startTime = qfCounters.startTimeCounter();
- if (this.file == null) {
- this.file = fs.open(path);
- this.pool = useZeroCopy ? new ByteBufferAllocatorPool() : null;
- this.zcr = useZeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null;
- }
- // Force direct buffers if we will be decompressing to direct cache.
- RecordReaderUtils.readDiskRanges(
- file, zcr, stripeOffset, toRead.next, cache.getAllocator().isDirectAlloc());
- qfCounters.recordHdfsTime(startTime);
- if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
- && LOG.isInfoEnabled()) {
- LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + stripeOffset
- + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
+ if (!result.value) {
+ if (isDataReaderOpen) {
+ this.dataReader.open();
}
+ dataReader.readFileData(toRead.next, stripeOffset, cache.getAllocator().isDirectAlloc());
}
// 3. For uncompressed case, we need some special processing before read.
@@ -382,15 +387,14 @@ public class EncodedReaderImpl implements EncodedReader {
ColumnReadContext ctx = colCtxs[colIxMod];
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
- DiskRangeList newIter = InStream.preReadUncompressedStream(fileId, stripeOffset,
- iter, sctx.offset, sctx.offset + sctx.length, zcr, cache, qfCounters);
+ DiskRangeList newIter = preReadUncompressedStream(
+ stripeOffset, iter, sctx.offset, sctx.offset + sctx.length);
if (newIter != null) {
iter = newIter;
}
}
}
- if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
- && LOG.isInfoEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Disk ranges after pre-read (file " + fileId + ", base offset "
+ stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
@@ -418,10 +422,10 @@ public class EncodedReaderImpl implements EncodedReader {
ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount);
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
- StreamBuffer cb = null;
+ ColumnStreamData cb = null;
if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
// This stream is for entire stripe and needed for every RG; uncompress once and reuse.
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
+ " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
}
@@ -436,9 +440,9 @@ public class EncodedReaderImpl implements EncodedReader {
// For stripe-level streams we don't need the extra refcount on the block.
// See class comment about refcounts.
long unlockUntilCOffset = sctx.offset + sctx.length;
- DiskRangeList lastCached = InStream.readEncodedStream(fileId, stripeOffset, iter,
- sctx.offset, sctx.offset + sctx.length, zcr, codec, bufferSize, cache,
- sctx.stripeLevelStream, unlockUntilCOffset, sctx.offset, qfCounters);
+ DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
+ sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
+ unlockUntilCOffset, sctx.offset);
if (lastCached != null) {
iter = lastCached;
}
@@ -459,11 +463,11 @@ public class EncodedReaderImpl implements EncodedReader {
cb = createRgStreamBuffer(
rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, isCompressed);
boolean isStartOfStream = sctx.bufferIter == null;
- DiskRangeList lastCached = InStream.readEncodedStream(fileId, stripeOffset,
- (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, zcr, codec,
- bufferSize, cache, cb, unlockUntilCOffset, sctx.offset, qfCounters);
+ DiskRangeList lastCached = readEncodedStream(stripeOffset,
+ (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
+ unlockUntilCOffset, sctx.offset);
if (lastCached != null) {
- sctx.bufferIter = iter = lastCached; // Reset iter just to ensure it's valid
+ sctx.bufferIter = iter = lastCached;
}
}
ecb.setStreamData(colIxMod, streamIx, cb);
@@ -475,25 +479,35 @@ public class EncodedReaderImpl implements EncodedReader {
}
releaseContexts(colCtxs);
- if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
- && LOG.isInfoEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Disk ranges after preparing all the data "
+ RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
// Release the unreleased buffers. See class comment about refcounts.
releaseInitialRefcounts(toRead.next);
- InStream.releaseCacheChunksIntoObjectPool(toRead.next);
+ releaseCacheChunksIntoObjectPool(toRead.next);
+ }
+
+
+ private static String arrayToString(boolean[] a) {
+ StringBuilder b = new StringBuilder();
+ b.append('[');
+ for (int i = 0; i < a.length; ++i) {
+ b.append(a[i] ? "1" : "0");
+ }
+ b.append(']');
+ return b.toString();
}
- private StreamBuffer createRgStreamBuffer(int rgIx, boolean isLastRg,
+ private ColumnStreamData createRgStreamBuffer(int rgIx, boolean isLastRg,
int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean isCompressed) {
- StreamBuffer cb;
+ ColumnStreamData cb;
cb = SB_POOL.take();
cb.init(sctx.kind.getNumber());
cb.incRef();
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Getting data for column "+ colIx + " " + (isLastRg ? "last " : "")
+ "RG " + rgIx + " stream " + sctx.kind + " at " + sctx.offset + ", "
+ sctx.length + " index position " + sctx.streamIndexOffset + ": " +
@@ -521,31 +535,869 @@ public class EncodedReaderImpl implements EncodedReader {
while (current != null) {
DiskRangeList toFree = current;
current = current.next;
- if (!(toFree instanceof TrackedCacheChunk)) continue;
- TrackedCacheChunk cc = (TrackedCacheChunk)toFree;
- if (cc.isReleased) continue;
- LlapMemoryBuffer buffer = ((CacheChunk)toFree).buffer;
- if (DebugUtils.isTraceLockingEnabled()) {
- LOG.info("Unlocking " + buffer + " for the fetching thread at the end");
- }
+ if (!(toFree instanceof CacheChunk)) continue;
+ CacheChunk cc = (CacheChunk)toFree;
+ if (cc.getBuffer() == null) continue;
+ MemoryBuffer buffer = cc.getBuffer();
cache.releaseBuffer(buffer);
- cc.isReleased = true;
+ cc.setBuffer(null);
}
}
+ @Override
+ public void setDebugTracing(boolean isEnabled) {
+ this.isDebugTracingEnabled = isEnabled;
+ }
+
@Override
public void close() throws IOException {
- if (file != null) {
- try {
- file.close();
- } catch (IOException ex) {
- // Tez might have closed our filesystem. Log and ignore error.
- LOG.info("Failed to close file; ignoring: " + ex.getMessage());
- }
- }
+ dataReader.close();
if (pool != null) {
pool.clear();
}
}
+
+ /** DiskRange containing encoded, uncompressed data from cache. */
+ @VisibleForTesting
+ public static class CacheChunk extends DiskRangeList {
+ protected MemoryBuffer buffer;
+
+ public CacheChunk() {
+ super(-1, -1);
+ }
+
+ public void init(MemoryBuffer buffer, long offset, long end) {
+ this.buffer = buffer;
+ this.offset = offset;
+ this.end = end;
+ }
+
+ @Override
+ public boolean hasData() {
+ return buffer != null;
+ }
+
+ @Override
+ public ByteBuffer getData() {
+ // Callers duplicate the buffer, they have to for BufferChunk
+ return buffer.getByteBufferRaw();
+ }
+
+ @Override
+ public String toString() {
+ return "start: " + offset + " end: " + end + " cache buffer: " + getBuffer();
+ }
+
+ @Override
+ public DiskRange sliceAndShift(long offset, long end, long shiftBy) {
+ throw new UnsupportedOperationException("Cache chunk cannot be sliced - attempted ["
+ + this.offset + ", " + this.end + ") to [" + offset + ", " + end + ") ");
+ }
+
+ public MemoryBuffer getBuffer() {
+ return buffer;
+ }
+
+ public void setBuffer(MemoryBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public void handleCacheCollision(DataCache cache,
+ MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void reset() {
+ init(null, -1, -1);
+ }
+ }
+
+ /**
+ * Fake cache chunk used for uncompressed data. Used in preRead for uncompressed files.
+ * Makes assumptions about preRead code; for example, we add chunks here when they are
+ * already in the linked list, without unlinking. So, we record the start position in the
+ * original list, and then, when someone adds the next element, we merely increase the number
+ * of elements one has to traverse from that position to get the whole list.
+ */
+ private static class UncompressedCacheChunk extends CacheChunk {
+ private BufferChunk chunk;
+ private int count;
+
+ public UncompressedCacheChunk(BufferChunk bc) {
+ super();
+ init(null, bc.getOffset(), bc.getEnd());
+ chunk = bc;
+ count = 1;
+ }
+
+ public void addChunk(BufferChunk bc) {
+ assert bc.getOffset() == this.getEnd();
+ this.end = bc.getEnd();
+ ++count;
+ }
+
+ public BufferChunk getChunk() {
+ return chunk;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public void handleCacheCollision(DataCache cache,
+ MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
+ assert cacheBuffers == null;
+ // This is done at pre-read stage where there's nothing special w/refcounts. Just release.
+ cache.getAllocator().deallocate(getBuffer());
+ // Replace the buffer in our big range list, as well as in current results.
+ this.setBuffer(replacementBuffer);
+ }
+
+ public void clear() {
+ this.chunk = null;
+ this.count = -1;
+ }
+ }
+
+ /**
+ * CacheChunk that is pre-created for new cache data; initially, it contains an original disk
+ * buffer and an unallocated MemoryBuffer object. Before we expose it, the MB is allocated,
+ * the data is decompressed, and original compressed data is discarded. The chunk lives on in
+ * the DiskRange list created for the request, and everyone treats it like regular CacheChunk.
+ */
+ private static class ProcCacheChunk extends CacheChunk {
+ public void init(long cbStartOffset, long cbEndOffset, boolean isCompressed,
+ ByteBuffer originalData, MemoryBuffer targetBuffer, int originalCbIndex) {
+ super.init(targetBuffer, cbStartOffset, cbEndOffset);
+ this.isOriginalDataCompressed = isCompressed;
+ this.originalData = originalData;
+ this.originalCbIndex = originalCbIndex;
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.originalData = null;
+ }
+
+ @Override
+ public void handleCacheCollision(DataCache cache, MemoryBuffer replacementBuffer,
+ List<MemoryBuffer> cacheBuffers) {
+ assert originalCbIndex >= 0;
+ // Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put,
+ // and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer
+ // is not in cache.
+ cache.getAllocator().deallocate(getBuffer());
+ cache.reuseBuffer(replacementBuffer);
+ // Replace the buffer in our big range list, as well as in current results.
+ this.buffer = replacementBuffer;
+ cacheBuffers.set(originalCbIndex, replacementBuffer);
+ originalCbIndex = -1; // This can only happen once at decompress time.
+ }
+
+ /** Original data that will be turned into encoded cache data in this.buffer and reset. */
+ private ByteBuffer originalData = null;
+ /** Whether originalData is compressed. */
+ private boolean isOriginalDataCompressed;
+ /** Index of the MemoryBuffer corresponding to this object inside the result list. If we
+ * hit a cache collision, we will replace this memory buffer with the one from cache at
+ * this index, without having to look for it. */
+ private int originalCbIndex;
+ }
+
+ /**
+ * Uncompresses part of the stream. RGs can overlap, so we cannot just go and decompress
+ * and remove what we have returned. We will keep iterator as a "hint" point.
+ * @param baseOffset Absolute offset of boundaries and ranges relative to file, for cache keys.
+ * @param start Ordered ranges containing file data. Helpful if they point close to cOffset.
+ * @param cOffset Start offset to decompress.
+ * @param endCOffset End offset to decompress; estimate, partial CBs will be ignored.
+ * @param streamBuffer Stream buffer, to add the results.
+ * @param unlockUntilCOffset The offset until which the buffers can be unlocked in cache, as
+ * they will not be used in future calls (see the class comment in
+ * EncodedReaderImpl about refcounts).
+ * @return Last buffer cached during decompression. Cache buffers are never removed from
+ * the master list, so they are safe to keep as iterators for various streams.
+ */
+ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, long cOffset,
+ long endCOffset, ColumnStreamData streamBuffer, long unlockUntilCOffset, long streamOffset)
+ throws IOException {
+ if (streamBuffer.getCacheBuffers() == null) {
+ streamBuffer.setCacheBuffers(new ArrayList<MemoryBuffer>());
+ } else {
+ streamBuffer.getCacheBuffers().clear();
+ }
+ if (cOffset == endCOffset) return null;
+ boolean isCompressed = codec != null;
+ List<ProcCacheChunk> toDecompress = null;
+ List<ByteBuffer> toRelease = null;
+ if (isCompressed) {
+ toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>();
+ toDecompress = new ArrayList<ProcCacheChunk>();
+ }
+
+ // 1. Find our bearings in the stream. Normally, iter will already point either to where we
+ // want to be, or just before. However, RGs can overlap due to encoding, so we may have
+ // to return to a previous block.
+ DiskRangeList current = findExactPosition(start, cOffset);
+ if (isDebugTracingEnabled) {
+ LOG.info("Starting read for [" + cOffset + "," + endCOffset + ") at " + current);
+ }
+
+ CacheChunk lastUncompressed = null;
+
+ // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
+ lastUncompressed = isCompressed ?
+ prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
+ unlockUntilCOffset, current, streamBuffer, toRelease, toDecompress)
+ : prepareRangesForUncompressedRead(
+ cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, streamBuffer);
+
+ // 3. Allocate the buffers, prepare cache keys.
+ // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
+ // data and some unallocated membufs for decompression. toDecompress contains all the work we
+ // need to do, and each item points to one of the membufs in cacheBuffers as target. The iter
+ // has also been adjusted to point to these buffers instead of compressed data for the ranges.
+ if (toDecompress == null) return lastUncompressed; // Nothing to decompress.
+
+ MemoryBuffer[] targetBuffers = new MemoryBuffer[toDecompress.size()];
+ DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
+ int ix = 0;
+ for (ProcCacheChunk chunk : toDecompress) {
+ cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
+ targetBuffers[ix] = chunk.getBuffer();
+ ++ix;
+ }
+ cache.getAllocator().allocateMultiple(targetBuffers, bufferSize);
+
+ // 4. Now decompress (or copy) the data into cache buffers.
+ for (ProcCacheChunk chunk : toDecompress) {
+ ByteBuffer dest = chunk.getBuffer().getByteBufferRaw();
+ if (chunk.isOriginalDataCompressed) {
+ decompressChunk(chunk.originalData, codec, dest);
+ } else {
+ copyUncompressedChunk(chunk.originalData, dest);
+ }
+
+ chunk.originalData = null;
+ if (isDebugTracingEnabled) {
+ LOG.info("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
+ }
+ cache.reuseBuffer(chunk.getBuffer());
+ }
+
+ // 5. Release original compressed buffers to zero-copy reader if needed.
+ if (toRelease != null) {
+ assert dataReader.isTrackingDiskRanges();
+ for (ByteBuffer buffer : toRelease) {
+ dataReader.releaseBuffer(buffer);
+ }
+ }
+
+ // 6. Finally, put uncompressed data to cache.
+ long[] collisionMask = cache.putFileData(fileId, cacheKeys, targetBuffers, baseOffset);
+ processCacheCollisions(collisionMask, toDecompress, targetBuffers, streamBuffer.getCacheBuffers());
+
+ // 7. It may happen that we know we won't use some compression buffers anymore.
+ // Release initial refcounts.
+ for (ProcCacheChunk chunk : toDecompress) {
+ ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk);
+ }
+
+ return lastUncompressed;
+ }
+
+ private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
+ long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData streamBuffer,
+ List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress) throws IOException {
+ if (cOffset > current.getOffset()) {
+ // Target compression block is in the middle of the range; slice the range in two.
+ current = current.split(cOffset).next;
+ }
+ long currentOffset = cOffset;
+ CacheChunk lastUncompressed = null;
+ while (true) {
+ DiskRangeList next = null;
+ if (current instanceof CacheChunk) {
+ // 2a. This is a decoded compression buffer, add as is.
+ CacheChunk cc = (CacheChunk)current;
+ if (isDebugTracingEnabled) {
+ LOG.info("Locking " + cc.getBuffer() + " due to reuse");
+ }
+ cache.reuseBuffer(cc.getBuffer());
+ streamBuffer.getCacheBuffers().add(cc.getBuffer());
+ currentOffset = cc.getEnd();
+ if (isDebugTracingEnabled) {
+ LOG.info("Adding an already-uncompressed buffer " + cc.getBuffer());
+ }
+ ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, cc);
+ lastUncompressed = cc;
+ next = current.next;
+ } else if (current instanceof IncompleteCb) {
+ // 2b. This is a known incomplete CB caused by ORC CB end boundaries being estimates.
+ if (isDebugTracingEnabled) {
+ LOG.info("Cannot read " + current);
+ }
+ next = null;
+ currentOffset = -1;
+ } else {
+ // 2c. This is a compressed buffer. We need to uncompress it; the buffer can comprise
+ // several disk ranges, so we might need to combine them.
+ BufferChunk bc = (BufferChunk)current;
+ ProcCacheChunk newCached = addOneCompressionBuffer(
+ bc, streamBuffer.getCacheBuffers(), toDecompress, toRelease);
+ lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
+ next = (newCached != null) ? newCached.next : null;
+ currentOffset = (next != null) ? next.getOffset() : -1;
+ }
+
+ if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
+ break;
+ }
+ current = next;
+ }
+ return lastUncompressed;
+ }
+
+ private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
+ long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData streamBuffer)
+ throws IOException {
+ long currentOffset = cOffset;
+ CacheChunk lastUncompressed = null;
+ boolean isFirst = true;
+ while (true) {
+ DiskRangeList next = null;
+ assert current instanceof CacheChunk;
+ lastUncompressed = (CacheChunk)current;
+ if (isDebugTracingEnabled) {
+ LOG.info("Locking " + lastUncompressed.getBuffer() + " due to reuse");
+ }
+ cache.reuseBuffer(lastUncompressed.getBuffer());
+ if (isFirst) {
+ streamBuffer.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset));
+ isFirst = false;
+ }
+ streamBuffer.getCacheBuffers().add(lastUncompressed.getBuffer());
+ currentOffset = lastUncompressed.getEnd();
+ if (isDebugTracingEnabled) {
+ LOG.info("Adding an uncompressed buffer " + lastUncompressed.getBuffer());
+ }
+ ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, lastUncompressed);
+ next = current.next;
+ if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
+ break;
+ }
+ current = next;
+ }
+ return lastUncompressed;
+ }
+
+ /**
+ * To achieve some sort of consistent cache boundaries, we will cache streams deterministically;
+ * in segments starting w/stream start, and going for either stream size or maximum allocation.
+ * If we are not reading the entire segment's worth of data, then we will not cache the partial
+ * RGs; the breakage of cache assumptions (no interleaving blocks, etc.) is way too much PITA
+ * to handle just for this case.
+ * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
+ * allocator. Uncompressed case is not mainline though so let's not complicate it.
+ * @param qfCounters
+ */
+ private DiskRangeList preReadUncompressedStream(long baseOffset,
+ DiskRangeList start, long streamOffset, long streamEnd) throws IOException {
+ if (streamOffset == streamEnd) return null;
+ List<UncompressedCacheChunk> toCache = null;
+ List<ByteBuffer> toRelease = null;
+
+ // 1. Find our bearings in the stream.
+ DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd);
+ if (isDebugTracingEnabled) {
+ LOG.info("Starting pre-read for [" + streamOffset + "," + streamEnd + ") at " + current);
+ }
+
+ if (streamOffset > current.getOffset()) {
+ // Target compression block is in the middle of the range; slice the range in two.
+ current = current.split(streamOffset).next;
+ }
+ // Account for maximum cache buffer size.
+ long streamLen = streamEnd - streamOffset;
+ int partSize = cache.getAllocator().getMaxAllocation(),
+ partCount = (int)((streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0));
+ long partOffset = streamOffset, partEnd = Math.min(partOffset + partSize, streamEnd);
+
+ CacheChunk lastUncompressed = null;
+ MemoryBuffer[] singleAlloc = new MemoryBuffer[1];
+ for (int i = 0; i < partCount; ++i) {
+ long hasEntirePartTo = -1;
+ if (partOffset == current.getOffset()) {
+ hasEntirePartTo = partOffset;
+ // We assume cache chunks would always match the way we read, so check and skip it.
+ if (current instanceof CacheChunk) {
+ lastUncompressed = (CacheChunk)current;
+ assert current.getOffset() == partOffset && current.getEnd() == partEnd;
+ partOffset = partEnd;
+ partEnd = Math.min(partOffset + partSize, streamEnd);
+ continue;
+ }
+ }
+ if (current.getOffset() >= partEnd) {
+ // We have no data at all for this part of the stream (could be unneeded), skip.
+ partOffset = partEnd;
+ partEnd = Math.min(partOffset + partSize, streamEnd);
+ continue;
+ }
+ if (toRelease == null && dataReader.isTrackingDiskRanges()) {
+ toRelease = new ArrayList<ByteBuffer>();
+ }
+ // We have some disk buffers... see if we have entire part, etc.
+ UncompressedCacheChunk candidateCached = null;
+ DiskRangeList next = current;
+ while (true) {
+ if (next == null || next.getOffset() >= partEnd) {
+ if (hasEntirePartTo < partEnd && candidateCached != null) {
+ // We are missing a section at the end of the part...
+ lastUncompressed = copyAndReplaceCandidateToNonCached(
+ candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc);
+ candidateCached = null;
+ }
+ break;
+ }
+ current = next;
+ boolean wasSplit = (current.getEnd() > partEnd);
+ if (wasSplit) {
+ current = current.split(partEnd);
+ }
+ if (isDebugTracingEnabled) {
+ LOG.info("Processing uncompressed file data at ["
+ + current.getOffset() + ", " + current.getEnd() + ")");
+ }
+ BufferChunk bc = (BufferChunk)current;
+ if (!wasSplit && toRelease != null) {
+ toRelease.add(bc.chunk); // TODO: is it valid to give zcr the modified 2nd part?
+ }
+
+ // Track if we still have the entire part.
+ long hadEntirePartTo = hasEntirePartTo;
+ if (hasEntirePartTo != -1) {
+ hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1;
+ }
+ if (candidateCached != null && hasEntirePartTo == -1) {
+ lastUncompressed = copyAndReplaceCandidateToNonCached(
+ candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc);
+ candidateCached = null;
+ }
+
+ if (hasEntirePartTo != -1) {
+ // So far we have all the data from the beginning of the part.
+ if (candidateCached == null) {
+ candidateCached = new UncompressedCacheChunk(bc);
+ } else {
+ candidateCached.addChunk(bc);
+ }
+ // We will take care of this at the end of the part, or if we find a gap.
+ next = current.next;
+ continue;
+ }
+ // We don't have the entire part; just copy to an allocated buffer. We could try to
+ // optimize a bit if we have contiguous buffers with gaps, but it's probably not needed.
+ lastUncompressed = copyAndReplaceUncompressedToNonCached(bc, cache, singleAlloc);
+ next = lastUncompressed.next;
+ }
+ if (candidateCached != null) {
+ if (toCache == null) {
+ toCache = new ArrayList<>(partCount - i);
+ }
+ toCache.add(candidateCached);
+ }
+ }
+
+ // 3. Allocate the buffers, prepare cache keys.
+ if (toCache == null) return lastUncompressed; // Nothing to copy and cache.
+
+ MemoryBuffer[] targetBuffers =
+ toCache.size() == 1 ? singleAlloc : new MemoryBuffer[toCache.size()];
+ targetBuffers[0] = null;
+ DiskRange[] cacheKeys = new DiskRange[toCache.size()];
+ int ix = 0;
+ for (UncompressedCacheChunk chunk : toCache) {
+ cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
+ ++ix;
+ }
+ cache.getAllocator().allocateMultiple(
+ targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
+
+ // 4. Now copy the data into cache buffers.
+ ix = 0;
+ for (UncompressedCacheChunk candidateCached : toCache) {
+ candidateCached.setBuffer(targetBuffers[ix]);
+ ByteBuffer dest = candidateCached.getBuffer().getByteBufferRaw();
+ copyAndReplaceUncompressedChunks(candidateCached, dest, candidateCached);
+ candidateCached.clear();
+ lastUncompressed = candidateCached;
+ ++ix;
+ }
+
+ // 5. Release original compressed buffers to zero-copy reader if needed.
+ if (toRelease != null) {
+ assert dataReader.isTrackingDiskRanges();
+ for (ByteBuffer buf : toRelease) {
+ dataReader.releaseBuffer(buf);
+ }
+ }
+
+ // 6. Finally, put uncompressed data to cache.
+ long[] collisionMask = cache.putFileData(fileId, cacheKeys, targetBuffers, baseOffset);
+ processCacheCollisions(collisionMask, toCache, targetBuffers, null);
+
+ return lastUncompressed;
+ }
+
+ private static void copyUncompressedChunk(ByteBuffer src, ByteBuffer dest) {
+ int startPos = dest.position(), startLim = dest.limit();
+ dest.put(src); // Copy uncompressed data to cache.
+ // Put moves position forward by the size of the data.
+ int newPos = dest.position();
+ if (newPos > startLim) {
+ throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
+ + ") became [" + newPos + ", " + dest.limit() + ")");
+ }
+ dest.position(startPos);
+ dest.limit(newPos);
+ }
+
+
+ private static CacheChunk copyAndReplaceCandidateToNonCached(
+ UncompressedCacheChunk candidateCached, long partOffset,
+ long candidateEnd, DataCache cache, MemoryBuffer[] singleAlloc) {
+ // We thought we had the entire part to cache, but we don't; convert start to
+ // non-cached. Since we are at the first gap, the previous stuff must be contiguous.
+ singleAlloc[0] = null;
+ cache.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
+
+ MemoryBuffer buffer = singleAlloc[0];
+ cache.reuseBuffer(buffer);
+ ByteBuffer dest = buffer.getByteBufferRaw();
+ CacheChunk tcc = TCC_POOL.take();
+ tcc.init(buffer, partOffset, candidateEnd);
+ copyAndReplaceUncompressedChunks(candidateCached, dest, tcc);
+ return tcc;
+ }
+
+ private static CacheChunk copyAndReplaceUncompressedToNonCached(
+ BufferChunk bc, DataCache cache, MemoryBuffer[] singleAlloc) {
+ singleAlloc[0] = null;
+ cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
+ MemoryBuffer buffer = singleAlloc[0];
+ cache.reuseBuffer(buffer);
+ ByteBuffer dest = buffer.getByteBufferRaw();
+ CacheChunk tcc = TCC_POOL.take();
+ tcc.init(buffer, bc.getOffset(), bc.getEnd());
+ copyUncompressedChunk(bc.chunk, dest);
+ bc.replaceSelfWith(tcc);
+ return tcc;
+ }
+
+ private static void copyAndReplaceUncompressedChunks(
+ UncompressedCacheChunk candidateCached, ByteBuffer dest, CacheChunk tcc) {
+ int startPos = dest.position(), startLim = dest.limit();
+ BufferChunk chunk = candidateCached.getChunk();
+ for (int i = 0; i < candidateCached.getCount(); ++i) {
+ dest.put(chunk.getData());
+ BufferChunk next = (BufferChunk)(chunk.next);
+ if (i == 0) {
+ chunk.replaceSelfWith(tcc);
+ } else {
+ chunk.removeSelf();
+ }
+ chunk = next;
+ }
+ int newPos = dest.position();
+ if (newPos > startLim) {
+ throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
+ + ") became [" + newPos + ", " + dest.limit() + ")");
+ }
+ dest.position(startPos);
+ dest.limit(newPos);
+ }
+
+ private static void decompressChunk(
+ ByteBuffer src, CompressionCodec codec, ByteBuffer dest) throws IOException {
+ int startPos = dest.position(), startLim = dest.limit();
+ codec.decompress(src, dest);
+ // Codec resets the position to 0 and limit to correct limit.
+ dest.position(startPos);
+ int newLim = dest.limit();
+ if (newLim > startLim) {
+ throw new AssertionError("After codec, buffer [" + startPos + ", " + startLim
+ + ") became [" + dest.position() + ", " + newLim + ")");
+ }
+ }
+
+ public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) {
+ while (current != null) {
+ if (current instanceof ProcCacheChunk) {
+ PCC_POOL.offer((ProcCacheChunk)current);
+ } else if (current instanceof CacheChunk) {
+ TCC_POOL.offer((CacheChunk)current);
+ }
+ current = current.next;
+ }
+ }
+
+ private void ponderReleaseInitialRefcount(
+ long unlockUntilCOffset, long streamStartOffset, CacheChunk cc) {
+ if (cc.getEnd() > unlockUntilCOffset) return;
+ assert cc.getBuffer() != null;
+ releaseInitialRefcount(cc, false);
+ // Release all the previous buffers that we may not have been able to release due to reuse.
+ DiskRangeList prev = cc.prev;
+ while (true) {
+ if ((prev == null) || (prev.getEnd() <= streamStartOffset)
+ || !(prev instanceof CacheChunk)) break;
+ CacheChunk prevCc = (CacheChunk)prev;
+ if (prevCc.buffer == null) break;
+ releaseInitialRefcount(prevCc, true);
+ prev = prev.prev;
+ }
+ }
+
+ private void releaseInitialRefcount(CacheChunk cc, boolean isBacktracking) {
+ // This is the last RG for which this buffer will be used. Remove the initial refcount
+ if (isDebugTracingEnabled) {
+ LOG.info("Unlocking " + cc.getBuffer() + " for the fetching thread"
+ + (isBacktracking ? "; backtracking" : ""));
+ }
+ cache.releaseBuffer(cc.getBuffer());
+ cc.setBuffer(null);
+ }
+
+ private void processCacheCollisions(long[] collisionMask,
+ List<? extends CacheChunk> toDecompress, MemoryBuffer[] targetBuffers,
+ List<MemoryBuffer> cacheBuffers) {
+ if (collisionMask == null) return;
+ assert collisionMask.length >= (toDecompress.size() >>> 6);
+ // There are some elements that were cached in parallel, take care of them.
+ long maskVal = -1;
+ for (int i = 0; i < toDecompress.size(); ++i) {
+ if ((i & 63) == 0) {
+ maskVal = collisionMask[i >>> 6];
+ }
+ if ((maskVal & 1) == 1) {
+ // Cache has found an old buffer for the key and put it into array instead of our new one.
+ CacheChunk replacedChunk = toDecompress.get(i);
+ MemoryBuffer replacementBuffer = targetBuffers[i];
+ if (isDebugTracingEnabled) {
+ LOG.info("Discarding data due to cache collision: " + replacedChunk.getBuffer()
+ + " replaced with " + replacementBuffer);
+ }
+ assert replacedChunk.getBuffer() != replacementBuffer : i + " was not replaced in the results "
+ + "even though mask is [" + Long.toBinaryString(maskVal) + "]";
+ replacedChunk.handleCacheCollision(cache, replacementBuffer, cacheBuffers);
+ }
+ maskVal >>= 1;
+ }
+ }
+
+
+ /** Finds compressed offset in a stream and makes sure iter points to its position.
+ This may be necessary for obscure combinations of compression and encoding boundaries. */
+ private static DiskRangeList findExactPosition(DiskRangeList ranges, long offset) {
+ if (offset < 0) return ranges;
+ return findIntersectingPosition(ranges, offset, offset);
+ }
+
+ private static DiskRangeList findIntersectingPosition(DiskRangeList ranges, long offset, long end) {
+ if (offset < 0) return ranges;
+ // We expect the offset to be valid TODO: rather, validate
+ while (ranges.getEnd() <= offset) {
+ ranges = ranges.next;
+ }
+ while (ranges.getOffset() > end) {
+ ranges = ranges.prev;
+ }
+ // We are now on some intersecting buffer, find the first intersecting buffer.
+ while (ranges.prev != null && ranges.prev.getEnd() > offset) {
+ ranges = ranges.prev;
+ }
+ return ranges;
+ }
+
+ private static class IncompleteCb extends DiskRangeList {
+ public IncompleteCb(long offset, long end) {
+ super(offset, end);
+ }
+
+ @Override
+ public String toString() {
+ return "incomplete CB start: " + offset + " end: " + end;
+ }
+ }
+
+ /**
+ * Reads one compression block from the source; handles compression blocks read from
+ * multiple ranges (usually, that would only happen with zcr).
+ * Adds stuff to cachedBuffers, toDecompress and toRelease (see below what each does).
+ * @param current BufferChunk where compression block starts.
+ * @param ranges Iterator of all chunks, pointing at current.
+ * @param cacheBuffers The result buffer array to add pre-allocated target cache buffer.
+ * @param toDecompress The list of work to decompress - pairs of compressed buffers and the
+ * target buffers (same as the ones added to cacheBuffers).
+ * @param toRelease The list of buffers to release to zcr because they are no longer in use.
+ * @return The resulting cache chunk.
+ */
+ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
+ List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
+ List<ByteBuffer> toRelease) throws IOException {
+ ByteBuffer slice = null;
+ ByteBuffer compressed = current.chunk;
+ long cbStartOffset = current.getOffset();
+ int b0 = compressed.get() & 0xff;
+ int b1 = compressed.get() & 0xff;
+ int b2 = compressed.get() & 0xff;
+ int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+ if (chunkLength > bufferSize) {
+ throw new IllegalArgumentException("Buffer size too small. size = " +
+ bufferSize + " needed = " + chunkLength);
+ }
+ int consumedLength = chunkLength + OutStream.HEADER_SIZE;
+ long cbEndOffset = cbStartOffset + consumedLength;
+ boolean isUncompressed = ((b0 & 0x01) == 1);
+ if (isDebugTracingEnabled) {
+ LOG.info("Found CB at " + cbStartOffset + ", chunk length " + chunkLength + ", total "
+ + consumedLength + ", " + (isUncompressed ? "not " : "") + "compressed");
+ }
+ if (compressed.remaining() >= chunkLength) {
+ // Simple case - CB fits entirely in the disk range.
+ slice = compressed.slice();
+ slice.limit(chunkLength);
+ ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed,
+ cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
+ if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
+ toRelease.add(compressed);
+ }
+ return cc;
+ }
+ if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
+ addIncompleteCompressionBuffer(cbStartOffset, current, 0);
+ return null; // This is impossible to read from this chunk.
+ }
+
+ // TODO: we could remove extra copy for isUncompressed case by copying directly to cache.
+ // We need to consolidate 2 or more buffers into one to decompress.
+ ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+ int remaining = chunkLength - compressed.remaining();
+ int originalPos = compressed.position();
+ copy.put(compressed);
+ if (isDebugTracingEnabled) {
+ LOG.info("Removing partial CB " + current + " from ranges after copying its contents");
+ }
+ DiskRangeList next = current.next;
+ current.removeSelf();
+ if (dataReader.isTrackingDiskRanges()) {
+ if (originalPos == 0) {
+ dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+ } else {
+ toRelease.add(compressed); // There might be slices depending on this buffer.
+ }
+ }
+
+ int extraChunkCount = 0;
+ while (true) {
+ if (!(next instanceof BufferChunk)) {
+ throw new IOException("Trying to extend compressed block into uncompressed block " + next);
+ }
+ compressed = next.getData();
+ ++extraChunkCount;
+ if (compressed.remaining() >= remaining) {
+ // This is the last range for this compression block. Yay!
+ slice = compressed.slice();
+ slice.limit(remaining);
+ copy.put(slice);
+ ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed,
+ cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers);
+ if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
+ dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+ }
+ return cc;
+ }
+ remaining -= compressed.remaining();
+ copy.put(compressed);
+ if (dataReader.isTrackingDiskRanges()) {
+ dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+ }
+ DiskRangeList tmp = next;
+ next = next.hasContiguousNext() ? next.next : null;
+ if (next != null) {
+ if (isDebugTracingEnabled) {
+ LOG.info("Removing partial CB " + tmp + " from ranges after copying its contents");
+ }
+ tmp.removeSelf();
+ } else {
+ addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount);
+ return null; // This is impossible to read from this chunk.
+ }
+ }
+ }
+
+ private void addIncompleteCompressionBuffer(
+ long cbStartOffset, DiskRangeList target, int extraChunkCount) {
+ IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
+ if (isDebugTracingEnabled) {
+ LOG.info("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with "
+ + icb + " in the buffers");
+ }
+ target.replaceSelfWith(icb);
+ }
+
+ /**
+ * Add one buffer with compressed data the results for addOneCompressionBuffer (see javadoc).
+ * @param fullCompressionBlock (fCB) Entire compression block, sliced or copied from disk data.
+ * @param isUncompressed Whether the data in the block is uncompressed.
+ * @param cbStartOffset Compressed start offset of the fCB.
+ * @param cbEndOffset Compressed end offset of the fCB.
+ * @param lastRange The buffer from which the last (or all) bytes of fCB come.
+ * @param lastChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock.
+ * @param ranges The iterator of all compressed ranges for the stream, pointing at lastRange.
+ * @param lastChunk
+ * @param toDecompress See addOneCompressionBuffer.
+ * @param cacheBuffers See addOneCompressionBuffer.
+ * @return New cache buffer.
+ */
+ private ProcCacheChunk addOneCompressionBlockByteBuffer(ByteBuffer fullCompressionBlock,
+ boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastChunkLength,
+ BufferChunk lastChunk, List<ProcCacheChunk> toDecompress, List<MemoryBuffer> cacheBuffers) {
+ // Prepare future cache buffer.
+ MemoryBuffer futureAlloc = cache.getAllocator().createUnallocated();
+ // Add it to result in order we are processing.
+ cacheBuffers.add(futureAlloc);
+ // Add it to the list of work to decompress.
+ ProcCacheChunk cc = PCC_POOL.take();
+ cc.init(cbStartOffset, cbEndOffset, !isUncompressed,
+ fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1);
+ toDecompress.add(cc);
+ // Adjust the compression block position.
+ if (isDebugTracingEnabled) {
+ LOG.info("Adjusting " + lastChunk + " to consume " + lastChunkLength + " compressed bytes");
+ }
+ lastChunk.chunk.position(lastChunk.chunk.position() + lastChunkLength);
+ // Finally, put it in the ranges list for future use (if shared between RGs).
+ // Before anyone else accesses it, it would have been allocated and decompressed locally.
+ if (lastChunk.chunk.remaining() <= 0) {
+ if (isDebugTracingEnabled) {
+ LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers");
+ }
+ lastChunk.replaceSelfWith(cc);
+ } else {
+ if (isDebugTracingEnabled) {
+ LOG.info("Adding " + cc + " before " + lastChunk + " in the buffers");
+ }
+ lastChunk.insertPartBefore(cc);
+ }
+ return cc;
+ }
+
+ private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
+ return isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
index 9e2d281..c546e22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
@@ -20,8 +20,9 @@ package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
/**
*
@@ -71,7 +72,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -88,9 +89,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
- private EncodedColumnBatch.StreamBuffer nanosStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
+ private ColumnStreamData nanosStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
private boolean skipCorrupt;
@@ -105,17 +106,17 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setSecondsStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setSecondsStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
- public StreamReaderBuilder setNanosStream(EncodedColumnBatch.StreamBuffer secondaryStream) {
+ public StreamReaderBuilder setNanosStream(ColumnStreamData secondaryStream) {
this.nanosStream = secondaryStream;
return this;
}
@@ -222,7 +223,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -251,10 +252,10 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
- private EncodedColumnBatch.StreamBuffer dictionaryStream;
- private EncodedColumnBatch.StreamBuffer lengthStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
+ private ColumnStreamData dictionaryStream;
+ private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -268,22 +269,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
- public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer lengthStream) {
+ public StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
this.lengthStream = lengthStream;
return this;
}
- public StreamReaderBuilder setDictionaryStream(EncodedColumnBatch.StreamBuffer dictStream) {
+ public StreamReaderBuilder setDictionaryStream(ColumnStreamData dictStream) {
this.dictionaryStream = dictStream;
return this;
}
@@ -360,7 +361,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -374,8 +375,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -389,12 +390,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -463,7 +464,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -477,8 +478,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
private boolean skipCorrupt;
@@ -493,12 +494,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -572,7 +573,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -586,8 +587,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -601,12 +602,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -675,7 +676,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -689,8 +690,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
public StreamReaderBuilder setFileId(Long fileId) {
@@ -703,12 +704,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -771,7 +772,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -785,8 +786,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
public StreamReaderBuilder setFileId(Long fileId) {
@@ -799,12 +800,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -878,7 +879,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -895,9 +896,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer valueStream;
- private EncodedColumnBatch.StreamBuffer scaleStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData valueStream;
+ private ColumnStreamData scaleStream;
private int scale;
private int precision;
private CompressionCodec compressionCodec;
@@ -923,17 +924,17 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setValueStream(EncodedColumnBatch.StreamBuffer valueStream) {
+ public StreamReaderBuilder setValueStream(ColumnStreamData valueStream) {
this.valueStream = valueStream;
return this;
}
- public StreamReaderBuilder setScaleStream(EncodedColumnBatch.StreamBuffer scaleStream) {
+ public StreamReaderBuilder setScaleStream(ColumnStreamData scaleStream) {
this.scaleStream = scaleStream;
return this;
}
@@ -1004,7 +1005,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1018,8 +1019,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -1033,12 +1034,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -1138,7 +1139,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1168,10 +1169,10 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private Long fileId;
private int columnIndex;
private int maxLength;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
- private EncodedColumnBatch.StreamBuffer dictionaryStream;
- private EncodedColumnBatch.StreamBuffer lengthStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
+ private ColumnStreamData dictionaryStream;
+ private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -1190,22 +1191,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
- public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer lengthStream) {
+ public StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
this.lengthStream = lengthStream;
return this;
}
- public StreamReaderBuilder setDictionaryStream(EncodedColumnBatch.StreamBuffer dictStream) {
+ public StreamReaderBuilder setDictionaryStream(ColumnStreamData dictStream) {
this.dictionaryStream = dictStream;
return this;
}
@@ -1312,7 +1313,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1342,10 +1343,10 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private Long fileId;
private int columnIndex;
private int maxLength;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
- private EncodedColumnBatch.StreamBuffer dictionaryStream;
- private EncodedColumnBatch.StreamBuffer lengthStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
+ private ColumnStreamData dictionaryStream;
+ private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -1364,22 +1365,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
- public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer lengthStream) {
+ public StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
this.lengthStream = lengthStream;
return this;
}
- public StreamReaderBuilder setDictionaryStream(EncodedColumnBatch.StreamBuffer dictStream) {
+ public StreamReaderBuilder setDictionaryStream(ColumnStreamData dictStream) {
this.dictionaryStream = dictStream;
return this;
}
@@ -1455,7 +1456,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1469,8 +1470,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
public StreamReaderBuilder setFileId(Long fileId) {
@@ -1483,12 +1484,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -1561,7 +1562,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1578,9 +1579,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
- private EncodedColumnBatch.StreamBuffer lengthStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
+ private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -1594,17 +1595,17 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
- public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer secondaryStream) {
+ public StreamReaderBuilder setLengthStream(ColumnStreamData secondaryStream) {
this.lengthStream = secondaryStream;
return this;
}
@@ -1673,7 +1674,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1687,8 +1688,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
public StreamReaderBuilder setFileId(Long fileId) {
@@ -1701,12 +1702,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -1740,11 +1741,11 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
List<OrcProto.ColumnEncoding> encodings,
EncodedColumnBatch<OrcBatchKey> batch,
CompressionCodec codec, boolean skipCorrupt) throws IOException {
- long file = batch.batchKey.file;
+ long file = batch.getBatchKey().file;
TreeReader[] treeReaders = new TreeReader[numCols];
for (int i = 0; i < numCols; i++) {
- int columnIndex = batch.columnIxs[i];
- EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i];
+ int columnIndex = batch.getColumnIxs()[i];
+ ColumnStreamData[] streamBuffers = batch.getColumnData()[i];
OrcProto.Type columnType = types.get(columnIndex);
// EncodedColumnBatch is already decompressed, we don't really need to pass codec.
@@ -1756,13 +1757,13 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex);
// stream buffers are arranged in enum order of stream kind
- EncodedColumnBatch.StreamBuffer present = null;
- EncodedColumnBatch.StreamBuffer data = null;
- EncodedColumnBatch.StreamBuffer dictionary = null;
- EncodedColumnBatch.StreamBuffer lengths = null;
- EncodedColumnBatch.StreamBuffer secondary = null;
- for (EncodedColumnBatch.StreamBuffer streamBuffer : streamBuffers) {
- switch (streamBuffer.streamKind) {
+ ColumnStreamData present = null;
+ ColumnStreamData data = null;
+ ColumnStreamData dictionary = null;
+ ColumnStreamData lengths = null;
+ ColumnStreamData secondary = null;
+ for (ColumnStreamData streamBuffer : streamBuffers) {
+ switch (streamBuffer.getStreamKind()) {
case 0:
// PRESENT stream
present = streamBuffer;
@@ -1784,7 +1785,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
secondary = streamBuffer;
break;
default:
- throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind);
+ throw new IOException("Unexpected stream kind: " + streamBuffer.getStreamKind());
}
}
[3/4] hive git commit: HIVE-11259 : LLAP: clean up ORC dependencies
part 1 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index c16be38..d51acb5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -19,11 +19,10 @@ package org.apache.hadoop.hive.llap.io.decode;
import java.io.IOException;
-import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
@@ -31,6 +30,8 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
import org.apache.hadoop.hive.ql.io.orc.EncodedTreeReaderFactory;
import org.apache.hadoop.hive.ql.io.orc.OrcProto;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
@@ -71,14 +72,14 @@ public class OrcEncodedDataConsumer
protected void decodeBatch(OrcEncodedColumnBatch batch,
Consumer<ColumnVectorBatch> downstreamConsumer) {
long startTime = counters.startTimeCounter();
- int currentStripeIndex = batch.batchKey.stripeIx;
+ int currentStripeIndex = batch.getBatchKey().stripeIx;
boolean sameStripe = currentStripeIndex == previousStripeIndex;
try {
OrcStripeMetadata stripeMetadata = stripes[currentStripeIndex];
// Get non null row count from root column, to get max vector batches
- int rgIdx = batch.batchKey.rgIx;
+ int rgIdx = batch.getBatchKey().rgIx;
long nonNullRowCount = -1;
if (rgIdx == OrcEncodedColumnBatch.ALL_RGS) {
nonNullRowCount = stripeMetadata.getRowCount();
@@ -88,7 +89,7 @@ public class OrcEncodedDataConsumer
}
int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1);
int batchSize = VectorizedRowBatch.DEFAULT_SIZE;
- int numCols = batch.columnIxs.length;
+ int numCols = batch.getColumnIxs().length;
if (columnReaders == null || !sameStripe) {
this.columnReaders = EncodedTreeReaderFactory.createEncodedTreeReader(numCols,
fileMetadata.getTypes(), stripeMetadata.getEncodings(), batch, codec, skipCorrupt);
@@ -106,10 +107,10 @@ public class OrcEncodedDataConsumer
}
ColumnVectorBatch cvb = cvbPool.take();
- assert cvb.cols.length == batch.columnIxs.length; // Must be constant per split.
+ assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split.
cvb.size = batchSize;
- for (int idx = 0; idx < batch.columnIxs.length; idx++) {
+ for (int idx = 0; idx < batch.getColumnIxs().length; idx++) {
cvb.cols[idx] = (ColumnVector)columnReaders[idx].nextVector(cvb.cols[idx], batchSize);
}
@@ -130,8 +131,8 @@ public class OrcEncodedDataConsumer
EncodedColumnBatch<OrcBatchKey> batch, int numCols,
OrcStripeMetadata stripeMetadata) throws IOException {
for (int i = 0; i < numCols; i++) {
- int columnIndex = batch.columnIxs[i];
- int rowGroupIndex = batch.batchKey.rgIx;
+ int columnIndex = batch.getColumnIxs()[i];
+ int rowGroupIndex = batch.getBatchKey().rgIx;
OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex];
OrcProto.RowIndexEntry rowIndexEntry = rowIndex.getEntry(rowGroupIndex);
columnReaders[i].seek(new RecordReaderImpl.PositionProviderImpl(rowIndexEntry));
@@ -142,9 +143,9 @@ public class OrcEncodedDataConsumer
EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe, int numCols,
OrcStripeMetadata stripeMetadata) throws IOException {
for (int i = 0; i < numCols; i++) {
- int columnIndex = batch.columnIxs[i];
- int rowGroupIndex = batch.batchKey.rgIx;
- EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i];
+ int columnIndex = batch.getColumnIxs()[i];
+ int rowGroupIndex = batch.getBatchKey().rgIx;
+ ColumnStreamData[] streamBuffers = batch.getColumnData()[i];
OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex];
OrcProto.RowIndexEntry rowIndexEntry = rowIndex.getEntry(rowGroupIndex);
columnReaders[i].setBuffers(streamBuffers, sameStripe);
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 5cf0780..f88a943 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -1,6 +1,7 @@
package org.apache.hadoop.hive.llap.io.encoded;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -12,25 +13,29 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.CallableWithNdc;
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.io.storage_api.Allocator;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache;
+import org.apache.hadoop.hive.common.io.storage_api.DataReader;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.cache.Cache;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
import org.apache.hadoop.hive.ql.exec.DDLTask;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
import org.apache.hadoop.hive.ql.io.orc.EncodedReader;
import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl;
@@ -44,9 +49,14 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcCacheKey;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
@@ -235,7 +245,9 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
try {
ensureOrcReader();
// Reader creating updates HDFS counters, don't do it here.
- stripeReader = orcReader.encodedReader(fileId, lowLevelCache, counters, dataConsumer);
+ DataWrapperForOrc dw = new DataWrapperForOrc();
+ stripeReader = orcReader.encodedReader(fileId, dw, dw);
+ stripeReader.setDebugTracing(DebugUtils.isTraceOrcEnabled());
} catch (Throwable t) {
consumer.setError(t);
recordReaderTime(startTime);
@@ -340,7 +352,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
// Also, currently readEncodedColumns is not stoppable. The consumer will discard the
// data it receives for one stripe. We could probably interrupt it, if it checked that.
stripeReader.readEncodedColumns(stripeIx, stripe, stripeMetadata.getRowIndexes(),
- stripeMetadata.getEncodings(), stripeMetadata.getStreams(), stripeIncludes, colRgs);
+ stripeMetadata.getEncodings(), stripeMetadata.getStreams(), stripeIncludes,
+ colRgs, dataConsumer);
} catch (Throwable t) {
consumer.setError(t);
cleanupReaders();
@@ -434,7 +447,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
}
}
LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
- return RecordReaderUtils.getFileId(fs, split.getPath());
+ return HdfsUtils.getFileId(fs, split.getPath());
}
private boolean[][] genStripeColRgs(List<Integer> stripeCols, boolean[][] globalColRgs) {
@@ -499,7 +512,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
*/
private void ensureOrcReader() throws IOException {
if (orcReader != null) return;
- Path path = RecordReaderUtils.getFileIdPath(fs, split.getPath(), fileId);
+ Path path = HdfsUtils.getFileIdPath(fs, split.getPath(), fileId);
if (DebugUtils.isTraceOrcEnabled()) {
LOG.info("Creating reader for " + path + " (" + split.getPath() + ")");
}
@@ -578,15 +591,15 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
@Override
public void returnData(OrcEncodedColumnBatch ecb) {
- for (StreamBuffer[] datas : ecb.columnData) {
- for (StreamBuffer data : datas) {
+ for (ColumnStreamData[] datas : ecb.getColumnData()) {
+ for (ColumnStreamData data : datas) {
if (data.decRef() != 0) continue;
if (DebugUtils.isTraceLockingEnabled()) {
- for (LlapMemoryBuffer buf : data.cacheBuffers) {
+ for (MemoryBuffer buf : data.getCacheBuffers()) {
LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing");
}
}
- lowLevelCache.releaseBuffers(data.cacheBuffers);
+ lowLevelCache.releaseBuffers(data.getCacheBuffers());
EncodedReaderImpl.SB_POOL.offer(data);
}
}
@@ -743,12 +756,12 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
if ((readMask == SargApplier.READ_NO_RGS) || (readMask != SargApplier.READ_ALL_RGS
&& (readMask.length <= rgIx || !readMask[rgIx]))) continue;
key.colIx = columnIds.get(colIxMod);
- StreamBuffer[] cached = cache.get(key);
+ ColumnStreamData[] cached = cache.get(key);
if (cached == null) {
isMissingAnyRgs[colIxMod] = true;
continue;
}
- col.setAllStreams(colIxMod, key.colIx, cached);
+ col.setAllStreamsData(colIxMod, key.colIx, cached);
hasAnyCached = true;
if (readMask == SargApplier.READ_ALL_RGS) {
// We were going to read all RGs, but some were in cache, allocate the mask.
@@ -793,16 +806,16 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
public void consumeData(OrcEncodedColumnBatch data) {
// Store object in cache; create new key object - cannot be reused.
assert cache != null;
- for (int i = 0; i < data.columnData.length; ++i) {
- OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIxs[i]);
- StreamBuffer[] toCache = data.columnData[i];
- StreamBuffer[] cached = cache.cacheOrGet(key, toCache);
+ for (int i = 0; i < data.getColumnData().length; ++i) {
+ OrcCacheKey key = new OrcCacheKey(data.getBatchKey(), data.getColumnIxs()[i]);
+ ColumnStreamData[] toCache = data.getColumnData()[i];
+ ColumnStreamData[] cached = cache.cacheOrGet(key, toCache);
if (toCache != cached) {
- for (StreamBuffer sb : toCache) {
+ for (ColumnStreamData sb : toCache) {
if (sb.decRef() != 0) continue;
- lowLevelCache.releaseBuffers(sb.cacheBuffers);
+ lowLevelCache.releaseBuffers(sb.getCacheBuffers());
}
- data.columnData[i] = cached;
+ data.getColumnData()[i] = cached;
}
}
consumer.consumeData(data);
@@ -812,4 +825,82 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
public void setError(Throwable t) {
consumer.setError(t);
}
+
+ private class DataWrapperForOrc implements DataReader, DataCache {
+ private DataReader orcDataReader;
+
+ public DataWrapperForOrc() {
+ boolean useZeroCopy = (conf != null)
+ && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_ZEROCOPY);
+ if (useZeroCopy && !lowLevelCache.getAllocator().isDirectAlloc()) {
+ throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache "
+ + "buffers; either disable zero-copy or enable direct cache allocation");
+ }
+ this.orcDataReader = orcReader.createDefaultDataReader(useZeroCopy);
+ }
+
+ @Override
+ public DiskRangeList getFileData(long fileId, DiskRangeList range,
+ long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
+ return lowLevelCache.getFileData(fileId, range, baseOffset, factory, counters, gotAllData);
+ }
+
+ @Override
+ public long[] putFileData(long fileId, DiskRange[] ranges,
+ MemoryBuffer[] data, long baseOffset) {
+ return lowLevelCache.putFileData(
+ fileId, ranges, data, baseOffset, Priority.NORMAL, counters);
+ }
+
+ @Override
+ public void releaseBuffer(MemoryBuffer buffer) {
+ lowLevelCache.releaseBuffer(buffer);
+ }
+
+ @Override
+ public void reuseBuffer(MemoryBuffer buffer) {
+ boolean isReused = lowLevelCache.reuseBuffer(buffer);
+ assert isReused;
+ }
+
+ @Override
+ public Allocator getAllocator() {
+ return lowLevelCache.getAllocator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ orcDataReader.close();
+ }
+
+ @Override
+ public DiskRangeList readFileData(DiskRangeList range, long baseOffset,
+ boolean doForceDirect) throws IOException {
+ long startTime = counters.startTimeCounter();
+ DiskRangeList result = orcDataReader.readFileData(range, baseOffset, doForceDirect);
+ counters.recordHdfsTime(startTime);
+ if (DebugUtils.isTraceOrcEnabled() && LOG.isInfoEnabled()) {
+ LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + baseOffset
+ + "): " + RecordReaderUtils.stringifyDiskRanges(result));
+ }
+ return result;
+ }
+
+ @Override
+ public boolean isTrackingDiskRanges() {
+ return orcDataReader.isTrackingDiskRanges();
+ }
+
+ @Override
+ public void releaseBuffer(ByteBuffer buffer) {
+ orcDataReader.releaseBuffer(buffer);
+ }
+
+ @Override
+ public void open() throws IOException {
+ long startTime = counters.startTimeCounter();
+ orcDataReader.open();
+ counters.recordHdfsTime(startTime);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
index dbfe4df..635ba6c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
@@ -21,13 +21,10 @@ package org.apache.hadoop.hive.llap.io.metadata;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.hadoop.hive.llap.cache.EvictionListener;
-import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
-import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
import org.apache.hadoop.hive.llap.cache.MemoryManager;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
public class OrcMetadataCache {
private final ConcurrentHashMap<Long, OrcFileMetadata> metadata =
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index c0b8b34..762c9f9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
import org.apache.hadoop.hive.ql.io.orc.MetadataReader;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.BloomFilter;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.BloomFilterIndex;
@@ -37,6 +36,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeFooter;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
import com.google.common.annotations.VisibleForTesting;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
index 50d5e19..3299135 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
@@ -30,9 +30,9 @@ import java.util.concurrent.FutureTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
+import org.apache.hadoop.hive.common.io.storage_api.Allocator.AllocatorOutOfMemoryException;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.cache.Allocator.LlapCacheOutOfMemoryException;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.junit.Test;
@@ -191,7 +191,7 @@ public class TestBuddyAllocator {
}
private void allocSameSize(BuddyAllocator a, int allocCount, int sizeLog2) throws Exception {
- LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[allocCount][];
+ MemoryBuffer[][] allocs = new MemoryBuffer[allocCount][];
long[][] testValues = new long[allocCount][];
for (int j = 0; j < allocCount; ++j) {
allocateAndUseBuffer(a, allocs, testValues, 1, j, sizeLog2);
@@ -202,7 +202,7 @@ public class TestBuddyAllocator {
private void allocateUp(BuddyAllocator a, int min, int max, int allocPerSize,
boolean isSameOrderDealloc) throws Exception {
int sizes = max - min + 1;
- LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[sizes][];
+ MemoryBuffer[][] allocs = new MemoryBuffer[sizes][];
// Put in the beginning; relies on the knowledge of internal implementation. Pave?
long[][] testValues = new long[sizes][];
for (int i = min; i <= max; ++i) {
@@ -214,7 +214,7 @@ public class TestBuddyAllocator {
private void allocateDown(BuddyAllocator a, int min, int max, int allocPerSize,
boolean isSameOrderDealloc) throws Exception {
int sizes = max - min + 1;
- LlapMemoryBuffer[][] allocs = new LlapMemoryBuffer[sizes][];
+ MemoryBuffer[][] allocs = new MemoryBuffer[sizes][];
// Put in the beginning; relies on the knowledge of internal implementation. Pave?
long[][] testValues = new long[sizes][];
for (int i = max; i >= min; --i) {
@@ -223,20 +223,20 @@ public class TestBuddyAllocator {
deallocUpOrDown(a, isSameOrderDealloc, allocs, testValues);
}
- private void allocateAndUseBuffer(BuddyAllocator a, LlapMemoryBuffer[][] allocs,
+ private void allocateAndUseBuffer(BuddyAllocator a, MemoryBuffer[][] allocs,
long[][] testValues, int allocCount, int index, int sizeLog2) throws Exception {
- allocs[index] = new LlapMemoryBuffer[allocCount];
+ allocs[index] = new MemoryBuffer[allocCount];
testValues[index] = new long[allocCount];
int size = (1 << sizeLog2) - 1;
try {
a.allocateMultiple(allocs[index], size);
- } catch (LlapCacheOutOfMemoryException ex) {
+ } catch (AllocatorOutOfMemoryException ex) {
LOG.error("Failed to allocate " + allocCount + " of " + size + "; " + a.debugDump());
throw ex;
}
// LOG.info("Allocated " + allocCount + " of " + size + "; " + a.debugDump());
for (int j = 0; j < allocCount; ++j) {
- LlapMemoryBuffer mem = allocs[index][j];
+ MemoryBuffer mem = allocs[index][j];
long testValue = testValues[index][j] = rdm.nextLong();
int pos = mem.getByteBufferRaw().position();
mem.getByteBufferRaw().putLong(pos, testValue);
@@ -248,7 +248,7 @@ public class TestBuddyAllocator {
}
private void deallocUpOrDown(BuddyAllocator a, boolean isSameOrderDealloc,
- LlapMemoryBuffer[][] allocs, long[][] testValues) {
+ MemoryBuffer[][] allocs, long[][] testValues) {
if (isSameOrderDealloc) {
for (int i = 0; i < allocs.length; ++i) {
deallocBuffers(a, allocs[i], testValues[i]);
@@ -261,7 +261,7 @@ public class TestBuddyAllocator {
}
private void deallocBuffers(
- BuddyAllocator a, LlapMemoryBuffer[] allocs, long[] testValues) {
+ BuddyAllocator a, MemoryBuffer[] allocs, long[] testValues) {
for (int j = 0; j < allocs.length; ++j) {
LlapDataBuffer mem = (LlapDataBuffer)allocs[j];
int pos = mem.getByteBufferRaw().position();
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
index 4697445..382e948 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java
@@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
import org.apache.hadoop.hive.ql.io.orc.MetadataReader;
@@ -54,6 +53,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcProto.TimestampStatistics;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.Index;
import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.junit.Test;
import org.mockito.Mockito;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
index 2c87ec1..d4b8a46 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
@@ -35,26 +35,28 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.common.DiskRangeList;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.CacheChunkFactory;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.common.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
+import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.CacheChunk;
import org.junit.Test;
public class TestLowLevelCacheImpl {
private static final Log LOG = LogFactory.getLog(TestLowLevelCacheImpl.class);
- private static final CacheChunkFactory testFactory = new CacheChunkFactory() {
- public DiskRangeList createCacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
- return new CacheChunk(buffer, offset, end);
+ private static final DiskRangeListFactory testFactory = new DiskRangeListFactory() {
+ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
+ CacheChunk cc = new CacheChunk();
+ cc.init(buffer, offset, end);
+ return cc;
}
};
private static class DummyAllocator implements EvictionAwareAllocator {
@Override
- public void allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+ public void allocateMultiple(MemoryBuffer[] dest, int size) {
for (int i = 0; i < dest.length; ++i) {
LlapDataBuffer buf = new LlapDataBuffer();
buf.initialize(0, null, -1, size);
@@ -63,11 +65,11 @@ public class TestLowLevelCacheImpl {
}
@Override
- public void deallocate(LlapMemoryBuffer buffer) {
+ public void deallocate(MemoryBuffer buffer) {
}
@Override
- public void deallocateEvicted(LlapMemoryBuffer buffer) {
+ public void deallocateEvicted(MemoryBuffer buffer) {
}
@Override
@@ -79,6 +81,11 @@ public class TestLowLevelCacheImpl {
public int getMaxAllocation() {
return 0;
}
+
+ @Override
+ public MemoryBuffer createUnallocated() {
+ return new LlapDataBuffer();
+ }
}
private static class DummyCachePolicy implements LowLevelCachePolicy {
@@ -115,16 +122,16 @@ public class TestLowLevelCacheImpl {
LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(),
new DummyAllocator(), true, -1); // no cleanup thread
long fn1 = 1, fn2 = 2;
- LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb() };
+ MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb() };
verifyRefcount(fakes, 1, 1, 1, 1, 1, 1);
- assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL));
- assertNull(cache.putFileData(fn2, drs(1, 2), fbs(fakes, 2, 3), 0, Priority.NORMAL));
+ assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null));
+ assertNull(cache.putFileData(fn2, drs(1, 2), fbs(fakes, 2, 3), 0, Priority.NORMAL, null));
verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]);
verifyCacheGet(cache, fn2, 1, 3, fakes[2], fakes[3]);
verifyCacheGet(cache, fn1, 2, 4, fakes[1], dr(3, 4));
verifyRefcount(fakes, 3, 4, 3, 3, 1, 1);
- LlapMemoryBuffer[] bufsDiff = fbs(fakes, 4, 5);
- long[] mask = cache.putFileData(fn1, drs(3, 1), bufsDiff, 0, Priority.NORMAL);
+ MemoryBuffer[] bufsDiff = fbs(fakes, 4, 5);
+ long[] mask = cache.putFileData(fn1, drs(3, 1), bufsDiff, 0, Priority.NORMAL, null);
assertEquals(1, mask.length);
assertEquals(2, mask[0]); // 2nd bit set - element 2 was already in cache.
assertSame(fakes[0], bufsDiff[1]); // Should have been replaced
@@ -134,7 +141,7 @@ public class TestLowLevelCacheImpl {
}
private void verifyCacheGet(LowLevelCacheImpl cache, long fileId, Object... stuff) {
- DiskRangeListCreateHelper list = new DiskRangeListCreateHelper();
+ CreateHelper list = new CreateHelper();
DiskRangeList iter = null;
int intCount = 0, lastInt = -1;
int resultCount = stuff.length;
@@ -153,13 +160,13 @@ public class TestLowLevelCacheImpl {
} else if (intCount >= 0) {
assertTrue(intCount == 0);
intCount = -1;
- iter = cache.getFileData(fileId, list.get(), 0, testFactory);
+ iter = cache.getFileData(fileId, list.get(), 0, testFactory, null, null);
assertEquals(resultCount, iter.listSize());
}
assertTrue(iter != null);
- if (obj instanceof LlapMemoryBuffer) {
+ if (obj instanceof MemoryBuffer) {
assertTrue(iter instanceof CacheChunk);
- assertSame(obj, ((CacheChunk)iter).buffer);
+ assertSame(obj, ((CacheChunk)iter).getBuffer());
} else {
assertTrue(iter.equals(obj));
}
@@ -173,9 +180,9 @@ public class TestLowLevelCacheImpl {
LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(),
new DummyAllocator(), true, -1); // no cleanup thread
long fn = 1;
- LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb() };
+ MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb() };
assertNull(cache.putFileData(
- fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL));
+ fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null));
verifyCacheGet(cache, fn, 1, 9, dr(1, 2), fakes[0], dr(4, 6), fakes[1], dr(8, 9));
verifyCacheGet(cache, fn, 2, 8, fakes[0], dr(4, 6), fakes[1]);
verifyCacheGet(cache, fn, 1, 5, dr(1, 2), fakes[0], dr(4, 5));
@@ -192,9 +199,9 @@ public class TestLowLevelCacheImpl {
LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(),
new DummyAllocator(), false, -1); // no cleanup thread
long fn = 1;
- LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb() };
+ MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb() };
assertNull(cache.putFileData(
- fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL));
+ fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes, 0, Priority.NORMAL, null));
// We expect cache requests from the middle here
verifyCacheGet(cache, fn, 3, 4, fakes[0]);
verifyCacheGet(cache, fn, 3, 7, fakes[0], dr(4, 6), fakes[1]);
@@ -206,9 +213,9 @@ public class TestLowLevelCacheImpl {
LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(),
new DummyAllocator(), true, -1); // no cleanup thread
long fn1 = 1, fn2 = 2;
- LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb() };
- assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL));
- assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 2), 0, Priority.NORMAL));
+ MemoryBuffer[] fakes = new MemoryBuffer[] { fb(), fb(), fb() };
+ assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1), 0, Priority.NORMAL, null));
+ assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 2), 0, Priority.NORMAL, null));
verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]);
verifyCacheGet(cache, fn2, 1, 2, fakes[2]);
verifyRefcount(fakes, 3, 3, 3);
@@ -225,23 +232,23 @@ public class TestLowLevelCacheImpl {
LlapDaemonCacheMetrics.create("test", "1"), new DummyCachePolicy(),
new DummyAllocator(), true, -1); // no cleanup thread
long fn1 = 1, fn2 = 2;
- LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] {
+ MemoryBuffer[] fakes = new MemoryBuffer[] {
fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb() };
- assertNull(cache.putFileData(fn1, drs(1, 2, 3), fbs(fakes, 0, 1, 2), 0, Priority.NORMAL));
- assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 3), 0, Priority.NORMAL));
+ assertNull(cache.putFileData(fn1, drs(1, 2, 3), fbs(fakes, 0, 1, 2), 0, Priority.NORMAL, null));
+ assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 3), 0, Priority.NORMAL, null));
evict(cache, fakes[0]);
evict(cache, fakes[3]);
long[] mask = cache.putFileData(
- fn1, drs(1, 2, 3, 4), fbs(fakes, 4, 5, 6, 7), 0, Priority.NORMAL);
+ fn1, drs(1, 2, 3, 4), fbs(fakes, 4, 5, 6, 7), 0, Priority.NORMAL, null);
assertEquals(1, mask.length);
assertEquals(6, mask[0]); // Buffers at offset 2 & 3 exist; 1 exists and is stale; 4 doesn't
- assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 8), 0, Priority.NORMAL));
+ assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 8), 0, Priority.NORMAL, null));
verifyCacheGet(cache, fn1, 1, 5, fakes[4], fakes[1], fakes[2], fakes[7]);
}
@Test
public void testCacheMetrics() {
- DiskRangeListCreateHelper list = new DiskRangeListCreateHelper();
+ CreateHelper list = new CreateHelper();
list.addOrMerge(0, 100, true, false);
list.addOrMerge(100, 200, true, false);
list.addOrMerge(200, 300, true, false);
@@ -249,7 +256,7 @@ public class TestLowLevelCacheImpl {
list.addOrMerge(400, 500, true, false);
assertEquals(1, list.get().listSize());
assertEquals(500, list.get().getTotalLength());
- list = new DiskRangeListCreateHelper();
+ list = new CreateHelper();
list.addOrMerge(0, 100, false, false);
list.addOrMerge(100, 200, false, false);
list.addOrMerge(200, 300, false, false);
@@ -257,7 +264,7 @@ public class TestLowLevelCacheImpl {
list.addOrMerge(400, 500, false, false);
assertEquals(5, list.get().listSize());
assertEquals(500, list.get().getTotalLength());
- list = new DiskRangeListCreateHelper();
+ list = new CreateHelper();
list.addOrMerge(0, 100, true, false);
list.addOrMerge(100, 200, true, false);
list.addOrMerge(200, 300, false, false);
@@ -270,35 +277,35 @@ public class TestLowLevelCacheImpl {
LowLevelCacheImpl cache = new LowLevelCacheImpl(metrics,
new DummyCachePolicy(), new DummyAllocator(), true, -1); // no cleanup thread
long fn = 1;
- LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[]{fb(), fb(), fb()};
+ MemoryBuffer[] fakes = new MemoryBuffer[]{fb(), fb(), fb()};
cache.putFileData(fn, new DiskRange[]{dr(0, 100), dr(300, 500), dr(800, 1000)},
- fakes, 0, Priority.NORMAL);
+ fakes, 0, Priority.NORMAL, null);
assertEquals(0, metrics.getCacheRequestedBytes());
assertEquals(0, metrics.getCacheHitBytes());
- list = new DiskRangeListCreateHelper();
+ list = new CreateHelper();
list.addOrMerge(0, 1000, true, false);
- cache.getFileData(fn, list.get(), 0, testFactory);
+ cache.getFileData(fn, list.get(), 0, testFactory, null, null);
assertEquals(1000, metrics.getCacheRequestedBytes());
assertEquals(500, metrics.getCacheHitBytes());
- list = new DiskRangeListCreateHelper();
+ list = new CreateHelper();
list.addOrMerge(0, 100, true, false);
- cache.getFileData(fn, list.get(), 0, testFactory);
+ cache.getFileData(fn, list.get(), 0, testFactory, null, null);
assertEquals(1100, metrics.getCacheRequestedBytes());
assertEquals(600, metrics.getCacheHitBytes());
- list = new DiskRangeListCreateHelper();
+ list = new CreateHelper();
list.addOrMerge(0, 100, true, false);
list.addOrMerge(300, 500, true, false);
list.addOrMerge(800, 1000, true, false);
- cache.getFileData(fn, list.get(), 0, testFactory);
+ cache.getFileData(fn, list.get(), 0, testFactory, null, null);
assertEquals(1600, metrics.getCacheRequestedBytes());
assertEquals(1100, metrics.getCacheHitBytes());
- list = new DiskRangeListCreateHelper();
+ list = new CreateHelper();
list.addOrMerge(300, 500, true, false);
list.addOrMerge(1000, 2000, true, false);
- cache.getFileData(fn, list.get(), 0, testFactory);
+ cache.getFileData(fn, list.get(), 0, testFactory, null, null);
assertEquals(2800, metrics.getCacheRequestedBytes());
assertEquals(1300, metrics.getCacheHitBytes());
}
@@ -325,12 +332,12 @@ public class TestLowLevelCacheImpl {
if (isGet) {
int[] offsets = new int[count];
count = generateOffsets(offsetsToUse, rdm, offsets);
- DiskRangeListCreateHelper list = new DiskRangeListCreateHelper();
+ CreateHelper list = new CreateHelper();
for (int j = 0; i < count; ++i) {
list.addOrMerge(offsets[j], offsets[j] + 1, true, false);
}
- DiskRangeList iter = cache.getFileData(fileName, list.get(), 0, testFactory);
+ DiskRangeList iter = cache.getFileData(fileName, list.get(), 0, testFactory, null, null);
int j = -1;
while (iter != null) {
++j;
@@ -339,7 +346,7 @@ public class TestLowLevelCacheImpl {
continue;
}
++gets;
- LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)iter).buffer;
+ LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)iter).getBuffer();
assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex);
cache.releaseBuffer(result);
iter = iter.next;
@@ -352,13 +359,13 @@ public class TestLowLevelCacheImpl {
ranges[j] = dr(next, next + 1);
offsets[j] = next;
}
- LlapMemoryBuffer[] buffers = new LlapMemoryBuffer[count];
+ MemoryBuffer[] buffers = new MemoryBuffer[count];
for (int j = 0; j < offsets.length; ++j) {
LlapDataBuffer buf = LowLevelCacheImpl.allocateFake();
buf.arenaIndex = makeFakeArenaIndex(fileIndex, offsets[j]);
buffers[j] = buf;
}
- long[] mask = cache.putFileData(fileName, ranges, buffers, 0, Priority.NORMAL);
+ long[] mask = cache.putFileData(fileName, ranges, buffers, 0, Priority.NORMAL, null);
puts += buffers.length;
long maskVal = 0;
if (mask != null) {
@@ -396,14 +403,14 @@ public class TestLowLevelCacheImpl {
DiskRangeList head = new DiskRangeList(0, offsetsToUse + 1);
isFirstFile = !isFirstFile;
long fileId = isFirstFile ? fn1 : fn2;
- head = cache.getFileData(fileId, head, 0, testFactory);
+ head = cache.getFileData(fileId, head, 0, testFactory, null, null);
DiskRange[] results = head.listToArray();
int startIndex = rdm.nextInt(results.length), index = startIndex;
LlapDataBuffer victim = null;
do {
DiskRange r = results[index];
if (r instanceof CacheChunk) {
- LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)r).buffer;
+ LlapDataBuffer result = (LlapDataBuffer)((CacheChunk)r).getBuffer();
cache.releaseBuffer(result);
if (victim == null && result.invalidate()) {
++evictions;
@@ -452,7 +459,7 @@ public class TestLowLevelCacheImpl {
}
}
- private void evict(LowLevelCacheImpl cache, LlapMemoryBuffer fake) {
+ private void evict(LowLevelCacheImpl cache, MemoryBuffer fake) {
LlapDataBuffer victimBuffer = (LlapDataBuffer)fake;
int refCount = victimBuffer.getRefCount();
for (int i = 0; i < refCount; ++i) {
@@ -462,14 +469,14 @@ public class TestLowLevelCacheImpl {
cache.notifyEvicted(victimBuffer);
}
- private void verifyRefcount(LlapMemoryBuffer[] fakes, int... refCounts) {
+ private void verifyRefcount(MemoryBuffer[] fakes, int... refCounts) {
for (int i = 0; i < refCounts.length; ++i) {
assertEquals("At " + i, refCounts[i], ((LlapDataBuffer)fakes[i]).getRefCount());
}
}
- private LlapMemoryBuffer[] fbs(LlapMemoryBuffer[] fakes, int... indexes) {
- LlapMemoryBuffer[] rv = new LlapMemoryBuffer[indexes.length];
+ private MemoryBuffer[] fbs(MemoryBuffer[] fakes, int... indexes) {
+ MemoryBuffer[] rv = new MemoryBuffer[indexes.length];
for (int i = 0; i < indexes.length; ++i) {
rv[i] = (indexes[i] == -1) ? null : fakes[indexes[i]];
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 7349208..a423eeb 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -33,7 +33,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.junit.Assume;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index 3549fd9..2886d54 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
index 8b61fe4..8dc62d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/DebugUtils.java
@@ -35,10 +35,6 @@ public class DebugUtils {
return isTraceOrcEnabled; // TODO: temporary, should be hardcoded false
}
- public static boolean isTraceRangesEnabled() {
- return false;
- }
-
public static boolean isTraceLockingEnabled() {
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
new file mode 100644
index 0000000..0095d31
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
+public class HdfsUtils {
+ private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
+ private static final Log LOG = LogFactory.getLog(HdfsUtils.class);
+
+ public static long getFileId(FileSystem fileSystem, Path path) throws IOException {
+ String pathStr = path.toUri().getPath();
+ if (fileSystem instanceof DistributedFileSystem) {
+ return SHIMS.getFileId(fileSystem, pathStr);
+ }
+ // If we are not on DFS, we just hash the file name + size and hope for the best.
+ // TODO: we assume it only happens in tests. Fix?
+ int nameHash = pathStr.hashCode();
+ long fileSize = fileSystem.getFileStatus(path).getLen();
+ long id = ((fileSize ^ (fileSize >>> 32)) << 32) | ((long)nameHash & 0xffffffffL);
+ LOG.warn("Cannot get unique file ID from "
+ + fileSystem.getClass().getSimpleName() + "; using " + id + "(" + pathStr
+ + "," + nameHash + "," + fileSize + ")");
+ return id;
+ }
+
+ // TODO: this relies on HDFS not changing the format; we assume if we could get inode ID, this
+ // is still going to work. Otherwise, file IDs can be turned off. Later, we should use
+ // as public utility method in HDFS to obtain the inode-based path.
+ private static String HDFS_ID_PATH_PREFIX = "/.reserved/.inodes/";
+
+ public static Path getFileIdPath(
+ FileSystem fileSystem, Path path, long fileId) {
+ return (fileSystem instanceof DistributedFileSystem)
+ ? new Path(HDFS_ID_PATH_PREFIX + fileId) : path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReader.java
index 3d2525b..a5cead4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReader.java
@@ -3,18 +3,20 @@ package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
public interface EncodedReader {
+ // TODO: document
void readEncodedColumns(int stripeIx, StripeInformation stripe,
RowIndex[] index, List<ColumnEncoding> encodings, List<Stream> streams,
- boolean[] included, boolean[][] colRgs) throws IOException;
+ boolean[] included, boolean[][] colRgs,
+ Consumer<OrcEncodedColumnBatch> consumer) throws IOException;
void close() throws IOException;
+
+ void setDebugTracing(boolean isEnabled);
}
\ No newline at end of file
[4/4] hive git commit: HIVE-11259 : LLAP: clean up ORC dependencies
part 1 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
HIVE-11259 : LLAP: clean up ORC dependencies part 1 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1e3b59d3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1e3b59d3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1e3b59d3
Branch: refs/heads/llap
Commit: 1e3b59d3714810bb8a75420607e10ff266b426b4
Parents: 5cd092b
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Jul 27 13:48:33 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Jul 27 13:48:33 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/common/DiskRangeList.java | 9 +-
.../hive/common/io/storage_api/Allocator.java | 51 +
.../hive/common/io/storage_api/DataCache.java | 101 ++
.../hive/common/io/storage_api/DataReader.java | 60 +
.../io/storage_api/EncodedColumnBatch.java | 139 +++
.../common/io/storage_api/MemoryBuffer.java | 28 +
.../org/apache/hadoop/hive/ql/QTestUtil.java | 1 -
.../org/apache/hadoop/hive/llap/Consumer.java | 30 -
.../hadoop/hive/llap/cache/Allocator.java | 34 -
.../llap/counters/LowLevelCacheCounters.java | 26 -
.../hive/llap/io/api/EncodedColumnBatch.java | 96 --
.../hadoop/hive/llap/io/api/LlapIoProxy.java | 2 +-
.../llap/io/api/cache/LlapMemoryBuffer.java | 30 -
.../hive/llap/io/api/cache/LowLevelCache.java | 96 --
.../hive/llap/io/api/orc/OrcBatchKey.java | 60 -
.../hive/llap/io/api/orc/OrcCacheKey.java | 58 -
.../hadoop/hive/llap/cache/BuddyAllocator.java | 32 +-
.../apache/hadoop/hive/llap/cache/Cache.java | 6 +-
.../hive/llap/cache/EvictionAwareAllocator.java | 5 +-
.../hadoop/hive/llap/cache/LlapDataBuffer.java | 4 +-
.../hadoop/hive/llap/cache/LowLevelCache.java | 76 ++
.../hive/llap/cache/LowLevelCacheCounters.java | 26 +
.../hive/llap/cache/LowLevelCacheImpl.java | 73 +-
.../hive/llap/cache/LowLevelCachePolicy.java | 2 +-
.../llap/cache/LowLevelFifoCachePolicy.java | 2 +-
.../llap/cache/LowLevelLrfuCachePolicy.java | 2 +-
.../hadoop/hive/llap/cache/NoopCache.java | 6 +-
.../llap/counters/QueryFragmentCounters.java | 1 +
.../hive/llap/io/api/impl/LlapInputFormat.java | 2 +-
.../hive/llap/io/api/impl/LlapIoImpl.java | 4 +-
.../llap/io/decode/ColumnVectorProducer.java | 2 +-
.../llap/io/decode/EncodedDataConsumer.java | 10 +-
.../llap/io/decode/OrcColumnVectorProducer.java | 6 +-
.../llap/io/decode/OrcEncodedDataConsumer.java | 27 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 137 ++-
.../hive/llap/io/metadata/OrcMetadataCache.java | 7 +-
.../llap/io/metadata/OrcStripeMetadata.java | 2 +-
.../hive/llap/cache/TestBuddyAllocator.java | 22 +-
.../TestIncrementalObjectSizeEstimator.java | 2 +-
.../hive/llap/cache/TestLowLevelCacheImpl.java | 119 +-
.../llap/cache/TestLowLevelLrfuCachePolicy.java | 2 +-
.../hive/llap/cache/TestOrcMetadataCache.java | 2 +-
.../org/apache/hadoop/hive/llap/DebugUtils.java | 4 -
.../org/apache/hadoop/hive/ql/io/HdfsUtils.java | 61 +
.../hadoop/hive/ql/io/orc/EncodedReader.java | 12 +-
.../hive/ql/io/orc/EncodedReaderImpl.java | 1118 +++++++++++++++---
.../ql/io/orc/EncodedTreeReaderFactory.java | 203 ++--
.../apache/hadoop/hive/ql/io/orc/InStream.java | 947 +--------------
.../hive/ql/io/orc/MetadataReaderImpl.java | 6 +-
.../apache/hadoop/hive/ql/io/orc/Reader.java | 15 +-
.../hadoop/hive/ql/io/orc/ReaderImpl.java | 28 +-
.../hadoop/hive/ql/io/orc/RecordReader.java | 8 -
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 71 +-
.../hive/ql/io/orc/RecordReaderUtils.java | 105 +-
.../hadoop/hive/ql/io/orc/StreamUtils.java | 12 +-
.../hive/ql/io/orc/TreeReaderFactory.java | 20 +-
.../hadoop/hive/ql/io/orc/llap/Consumer.java | 30 +
.../hadoop/hive/ql/io/orc/llap/OrcBatchKey.java | 60 +
.../hadoop/hive/ql/io/orc/llap/OrcCacheKey.java | 58 +
59 files changed, 2205 insertions(+), 1953 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java b/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
index 250f901..4fa72a2 100644
--- a/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
@@ -152,10 +152,8 @@ public class DiskRangeList extends DiskRange {
return result;
}
- public static class DiskRangeListCreateHelper {
+ public static class CreateHelper {
private DiskRangeList tail = null, head;
- public DiskRangeListCreateHelper() {
- }
public DiskRangeList getTail() {
return tail;
@@ -175,7 +173,6 @@ public class DiskRangeList extends DiskRange {
}
}
-
public DiskRangeList get() {
return head;
}
@@ -192,8 +189,8 @@ public class DiskRangeList extends DiskRange {
* and thus remains constant even if head is replaced with some new range via in-place list
* mutation. extract() can be used to obtain the modified list.
*/
- public static class DiskRangeListMutateHelper extends DiskRangeList {
- public DiskRangeListMutateHelper(DiskRangeList head) {
+ public static class MutateHelper extends DiskRangeList {
+ public MutateHelper(DiskRangeList head) {
super(-1, -1);
assert head != null;
assert head.prev == null;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/io/storage_api/Allocator.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/io/storage_api/Allocator.java b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/Allocator.java
new file mode 100644
index 0000000..0814fe7
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/Allocator.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.io.storage_api;
+
+/** An allocator provided externally to storage classes to allocate MemoryBuffer-s. */
+public interface Allocator {
+ public static class AllocatorOutOfMemoryException extends RuntimeException {
+ public AllocatorOutOfMemoryException(String msg) {
+ super(msg);
+ }
+
+ private static final long serialVersionUID = 268124648177151761L;
+ }
+
+ /**
+ * Allocates multiple buffers of a given size.
+ * @param dest Array where buffers are placed. Objects are reused if already there
+ * (see createUnallocated), created otherwise.
+ * @param size Allocation size.
+ * @throws AllocatorOutOfMemoryException Cannot allocate.
+ */
+ void allocateMultiple(MemoryBuffer[] dest, int size) throws AllocatorOutOfMemoryException;
+
+ /**
+ * Creates an unallocated memory buffer object. This object can be passed to allocateMultiple
+ * to allocate; this is useful if data structures are created for separate buffers that can
+ * later be allocated together.
+ */
+ MemoryBuffer createUnallocated();
+ /** Deallocates a memory buffer. */
+ void deallocate(MemoryBuffer buffer);
+ /** Whether the allocator uses direct buffers. */
+ boolean isDirectAlloc();
+ /** Maximum allocation size supported by this allocator. */
+ int getMaxAllocation();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataCache.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataCache.java b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataCache.java
new file mode 100644
index 0000000..0ec67ea
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataCache.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.io.storage_api;
+
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+
+/** An abstract data cache that IO formats can use to retrieve and cache data. */
+public interface DataCache {
+ public static final class BooleanRef {
+ public boolean value;
+ }
+
+ /** Disk range factory used during cache retrieval. */
+ public interface DiskRangeListFactory {
+ DiskRangeList createCacheChunk(MemoryBuffer buffer, long startOffset, long endOffset);
+ }
+
+ /**
+ * Gets file data for particular offsets. The range list is modified in place; it is then
+ * returned (since the list head could have changed). Ranges are replaced with cached ranges.
+ *
+ * Any such buffer is locked in cache to prevent eviction, and must therefore be released
+ * back to cache via a corresponding call (releaseBuffer) when the caller is done with it.
+ *
+ * In case of partial overlap with cached data, full cache blocks are always returned;
+ * there's no capacity for partial matches in return type. The rules are as follows:
+ * 1) If the requested range starts in the middle of a cached range, that cached range will not
+ * be returned by default (e.g. if [100,200) and [200,300) are cached, the request for
+ * [150,300) will only return [200,300) from cache). This may be configurable in impls.
+ * This is because we assume well-known range start offsets are used (rg/stripe offsets), so
+ * a request from the middle of the start doesn't make sense.
+ * 2) If the requested range ends in the middle of a cached range, that entire cached range will
+ * be returned (e.g. if [100,200) and [200,300) are cached, the request for [100,250) will
+ * return both ranges). It should really be same as #1, however currently ORC uses estimated
+ * end offsets; if we don't return the end block, the caller may read it from disk needlessly.
+ *
+ * @param fileId Unique ID of the target file on the file system.
+ * @param range A set of DiskRange-s (linked list) that is to be retrieved. May be modified.
+ * @param baseOffset base offset for the ranges (stripe/stream offset in case of ORC).
+ * @param factory A factory to produce DiskRangeList-s out of cached MemoryBuffer-s.
+ * @param gotAllData An out param - whether all the requested data was found in cache.
+ * @return The new or modified list of DiskRange-s, where some ranges may contain cached data.
+ */
+ DiskRangeList getFileData(long fileId, DiskRangeList range, long baseOffset,
+ DiskRangeListFactory factory, BooleanRef gotAllData);
+
+ /**
+ * Puts file data into cache, or gets older data in case of collisions.
+ *
+ * The memory buffers provided MUST be allocated via an allocator returned by getAllocator
+ * method, to allow cache implementations that evict and then de-allocate the buffer.
+ *
+ * It is assumed that the caller will use the data immediately, therefore any buffers provided
+ * to putFileData (or returned due to cache collision) are locked in cache to prevent eviction,
+ * and must therefore be released back to cache via a corresponding call (releaseBuffer) when the
+ * caller is done with it. Buffers rejected due to conflict will neither be locked, nor
+ * automatically deallocated. The caller must take care to discard these buffers.
+ *
+ * @param fileId Unique ID of the target file on the file system.
+ * @param ranges The ranges for which the data is being cached. These objects will not be stored.
+ * @param data The data for the corresponding ranges.
+ * @param baseOffset base offset for the ranges (stripe/stream offset in case of ORC).
+ * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
+ * the replacement chunks from cache are updated directly in the array.
+ */
+ long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] data, long baseOffset);
+
+ /**
+ * Releases the buffer returned by getFileData/provided to putFileData back to cache.
+ * See respective javadocs for details.
+ */
+ void releaseBuffer(MemoryBuffer buffer);
+
+ /**
+ * Notifies the cache that the buffer returned from getFileData/provided to putFileData will
+ * be used by another consumer and therefore released multiple times (one more time per call).
+ */
+ void reuseBuffer(MemoryBuffer buffer);
+
+ /**
+ * Gets the allocator associated with this DataCache.
+ */
+ Allocator getAllocator();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataReader.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataReader.java b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataReader.java
new file mode 100644
index 0000000..0e11e4e
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataReader.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.io.storage_api;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.DiskRangeListFactory;
+
+/** An abstract data reader that IO formats can use to read bytes from underlying storage. */
+public interface DataReader {
+
+ /** Opens the DataReader, making it ready to use. */
+ void open() throws IOException;
+
+ /** Closes the DataReader. */
+ void close() throws IOException;
+
+ /** Reads the data.
+ *
+ * Note that for the cases such as zero-copy read, caller must release the disk ranges
+ * produced after being done with them. Call isTrackingDiskRanges to find out if this is needed.
+ * @param range List if disk ranges to read. Ranges with data will be ignored.
+ * @param baseOffset Base offset from the start of the file of the ranges in disk range list.
+ * @param doForceDirect Whether the data should be read into direct buffers.
+ * @return New or modified list of DiskRange-s, where all the ranges are filled with data.
+ */
+ DiskRangeList readFileData(
+ DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException;
+
+
+ /**
+ * Whether the user should release buffers created by readFileData. See readFileData javadoc.
+ */
+ boolean isTrackingDiskRanges();
+
+ /**
+ * Releases buffers created by readFileData. See readFileData javadoc.
+ * @param toRelease The buffer to release.
+ */
+ void releaseBuffer(ByteBuffer toRelease);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/io/storage_api/EncodedColumnBatch.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/io/storage_api/EncodedColumnBatch.java b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/EncodedColumnBatch.java
new file mode 100644
index 0000000..d51e3b4
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/EncodedColumnBatch.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.io.storage_api;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A block of data for a given section of a file, similar to VRB but in encoded form.
+ * Stores a set of buffers for each encoded stream that is a part of each column.
+ */
+public class EncodedColumnBatch<BatchKey> {
+ /**
+ * Slice of the data for a stream for some column, stored inside MemoryBuffer's.
+ * ColumnStreamData can be reused for many EncodedColumnBatch-es (e.g. dictionary stream), so
+ * it tracks the number of such users via a refcount.
+ */
+ public static class ColumnStreamData {
+ private List<MemoryBuffer> cacheBuffers;
+ /** Base offset from the beginning of the indexable unit; for example, for ORC,
+ * offset from the CB in a compressed file, from the stream in uncompressed file. */
+ private int indexBaseOffset;
+ /** Stream type; format-specific. */
+ private int streamKind;
+
+ /** Reference count. */
+ private AtomicInteger refCount = new AtomicInteger(0);
+
+ public void init(int kind) {
+ streamKind = kind;
+ indexBaseOffset = 0;
+ }
+
+ public void reset() {
+ cacheBuffers.clear();
+ refCount.set(0);
+ }
+
+ public void incRef() {
+ refCount.incrementAndGet();
+ }
+
+ public int decRef() {
+ int i = refCount.decrementAndGet();
+ assert i >= 0;
+ return i;
+ }
+
+ public List<MemoryBuffer> getCacheBuffers() {
+ return cacheBuffers;
+ }
+
+ public void setCacheBuffers(List<MemoryBuffer> cacheBuffers) {
+ this.cacheBuffers = cacheBuffers;
+ }
+
+ public int getIndexBaseOffset() {
+ return indexBaseOffset;
+ }
+
+ public void setIndexBaseOffset(int indexBaseOffset) {
+ this.indexBaseOffset = indexBaseOffset;
+ }
+
+ public int getStreamKind() {
+ return streamKind;
+ }
+ }
+
+ /** The key that is used to map this batch to source location. */
+ protected BatchKey batchKey;
+ /** Stream data for each stream, for each included column. */
+ protected ColumnStreamData[][] columnData;
+ /** Column indexes included in the batch. Correspond to columnData elements. */
+ protected int[] columnIxs;
+ // TODO: Maybe remove when solving the pooling issue.
+ /** Generation version necessary to sync pooling reuse with the fact that two separate threads
+ * operate on batches - the one that decodes them, and potential separate thread w/a "stop" call
+ * that cleans them up. We don't want the decode thread to use the ECB that was thrown out and
+ * reused, so it remembers the version and checks it after making sure no cleanup thread can ever
+ * get to this ECB anymore. All this sync is ONLY needed because of high level cache code. */
+ public int version = Integer.MIN_VALUE;
+
+ public void reset() {
+ if (columnData != null) {
+ for (int i = 0; i < columnData.length; ++i) {
+ columnData[i] = null;
+ }
+ }
+ }
+
+ public void initColumn(int colIxMod, int colIx, int streamCount) {
+ columnIxs[colIxMod] = colIx;
+ columnData[colIxMod] = new ColumnStreamData[streamCount];
+ }
+
+ public void setStreamData(int colIxMod, int streamIx, ColumnStreamData sb) {
+ columnData[colIxMod][streamIx] = sb;
+ }
+
+ public void setAllStreamsData(int colIxMod, int colIx, ColumnStreamData[] sbs) {
+ columnIxs[colIxMod] = colIx;
+ columnData[colIxMod] = sbs;
+ }
+
+ public BatchKey getBatchKey() {
+ return batchKey;
+ }
+
+ public ColumnStreamData[][] getColumnData() {
+ return columnData;
+ }
+
+ public int[] getColumnIxs() {
+ return columnIxs;
+ }
+
+ protected void resetColumnArrays(int columnCount) {
+ if (columnIxs != null && columnCount == columnIxs.length) return;
+ columnIxs = new int[columnCount];
+ columnData = new ColumnStreamData[columnCount][];
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/io/storage_api/MemoryBuffer.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/io/storage_api/MemoryBuffer.java b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/MemoryBuffer.java
new file mode 100644
index 0000000..4dd2f09
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/MemoryBuffer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.io.storage_api;
+
+import java.nio.ByteBuffer;
+
+/** Abstract interface for any class wrapping a ByteBuffer. */
+public interface MemoryBuffer {
+ /** Note - raw buffer should not be modified. */
+ public ByteBuffer getByteBufferRaw();
+ public ByteBuffer getByteBufferDup();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index a0e0c94..c9ed0d4 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -82,7 +82,6 @@ import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton;
import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/Consumer.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/Consumer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/Consumer.java
deleted file mode 100644
index 9db9f75c..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/Consumer.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap;
-
-/**
- * Data consumer; an equivalent of a data queue for an asynchronous data producer.
- */
-public interface Consumer<T> {
- /** Some data has been produced. */
- public void consumeData(T data);
- /** No more data will be produced; done */
- public void setDone();
- /** No more data will be produced; error during production */
- public void setError(Throwable t);
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java b/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
deleted file mode 100644
index 4e990ef..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.cache;
-
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-
-public interface Allocator {
- public static class LlapCacheOutOfMemoryException extends RuntimeException {
- public LlapCacheOutOfMemoryException(String msg) {
- super(msg);
- }
-
- private static final long serialVersionUID = 268124648177151761L;
- }
- void allocateMultiple(LlapMemoryBuffer[] dest, int size) throws LlapCacheOutOfMemoryException;
- void deallocate(LlapMemoryBuffer buffer);
- boolean isDirectAlloc();
- int getMaxAllocation();
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java b/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java
deleted file mode 100644
index d862a83..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.counters;
-
-public interface LowLevelCacheCounters {
- void recordCacheHit(long bytesHit);
- void recordCacheMiss(long bytesMissed);
- void recordAllocBytes(long bytesWasted, long bytesAllocated);
- void recordHdfsTime(long timeUs);
- long startTimeCounter();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java
deleted file mode 100644
index 9734fc0..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.api;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-
-public class EncodedColumnBatch<BatchKey> {
- // TODO: temporary class. Will be filled in when reading (ORC) is implemented. Need to balance
- // generality, and ability to not copy data from underlying low-level cached buffers.
- public static class StreamBuffer {
- // Decoder knows which stream this belongs to, and each buffer is a compression block,
- // so he can figure out the offsets from metadata.
- public List<LlapMemoryBuffer> cacheBuffers;
- public int streamKind;
- /** Base offset from the beginning of the indexable unit;
- * CB in compressed, stream in uncompressed file. */
- public int indexBaseOffset;
-
- // StreamBuffer can be reused for many RGs (e.g. dictionary case). To avoid locking every
- // LlapMemoryBuffer 500 times, have a separate refcount on StreamBuffer itself.
- public AtomicInteger refCount = new AtomicInteger(0);
-
- public void init(int kind) {
- streamKind = kind;
- indexBaseOffset = 0;
- }
-
- public void reset() {
- cacheBuffers.clear();
- refCount.set(0);
- }
-
- public void incRef() {
- refCount.incrementAndGet();
- }
-
- public int decRef() {
- int i = refCount.decrementAndGet();
- assert i >= 0;
- return i;
- }
- }
-
- public BatchKey batchKey;
- public StreamBuffer[][] columnData;
- public int[] columnIxs;
- /** Generation version necessary to sync pooling reuse with the fact that two separate threads
- * operate on batches - the one that decodes them, and potential separate thread w/a "stop" call
- * that cleans them up. We don't want the decode thread to use the ECB that was thrown out and
- * reused, so it remembers the version and checks it after making sure no cleanup thread can ever
- * get to this ECB anymore. All this sync is ONLY needed because of high level cache code (sync
- * in decode thread is for the map that combines columns coming from cache and from file), so
- * if we throw this presently-unused code out, we'd be able to get rid of this. */
- public int version = Integer.MIN_VALUE;
-
- public void reset() {
- if (columnData != null) {
- for (int i = 0; i < columnData.length; ++i) {
- columnData[i] = null;
- }
- }
- }
-
- public void initColumn(int colIxMod, int colIx, int streamCount) {
- columnIxs[colIxMod] = colIx;
- columnData[colIxMod] = new StreamBuffer[streamCount];
- }
-
- public void setStreamData(int colIxMod, int streamIx, StreamBuffer sb) {
- columnData[colIxMod][streamIx] = sb;
- }
-
- public void setAllStreams(int colIxMod, int colIx, StreamBuffer[] sbs) {
- columnIxs[colIxMod] = colIx;
- columnData[colIxMod] = sbs;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
index c48af7b..4c31e32 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
@@ -28,7 +28,7 @@ public class LlapIoProxy {
private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl";
// Llap server depends on Hive execution, so the reverse cannot be true. We create the I/O
- // singleton once (on daemon startup); the said singleton server as the IO interface.
+ // singleton once (on daemon startup); the said singleton serves as the IO interface.
private static LlapIo io = null;
private static boolean isDaemon = false;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
deleted file mode 100644
index 5ec4d63..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.api.cache;
-
-import java.nio.ByteBuffer;
-import org.apache.hadoop.metrics2.MetricsSource;
-
-
-public interface LlapMemoryBuffer {
- /** Note - position of the raw buffer should NOT be modified ever;
- * limit should not be modified after it's in cache. */
- public ByteBuffer getByteBufferRaw();
- public ByteBuffer getByteBufferDup();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
deleted file mode 100644
index fcc7eed..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.api.cache;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.common.DiskRange;
-import org.apache.hadoop.hive.common.DiskRangeList;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
-import org.apache.hadoop.hive.llap.cache.Allocator;
-import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
-
-public interface LowLevelCache {
- public enum Priority {
- NORMAL,
- HIGH
- }
-
- public class CacheListHelper extends DiskRangeListMutateHelper {
- public CacheListHelper(DiskRangeList head) {
- super(head);
- }
-
- /** Workaround for Java's limitations, used to return stuff from getFileData. */
- public boolean didGetAllData;
- }
-
- /**
- * Gets file data for particular offsets. The range list is modified in place; it is then
- * returned (since the list head could have changed). Ranges are replaced with cached ranges.
- * In case of partial overlap with cached data, full cache blocks are always returned;
- * there's no capacity for partial matches in return type. The rules are as follows:
- * 1) If the requested range starts in the middle of a cached range, that cached range will not
- * be returned by default (e.g. if [100,200) and [200,300) are cached, the request for
- * [150,300) will only return [200,300) from cache). This may be configurable in impls.
- * This is because we assume well-known range start offsets are used (rg/stripe offsets), so
- * a request from the middle of the start doesn't make sense.
- * 2) If the requested range ends in the middle of a cached range, that entire cached range will
- * be returned (e.g. if [100,200) and [200,300) are cached, the request for [100,250) will
- * return both ranges). It should really be same as #1, however currently ORC uses estimated
- * end offsets; we do in fact know in such cases that partially-matched cached block (rg)
- * can be thrown away, the reader will never touch it; but we need code in the reader to
- * handle such cases to avoid disk reads for these "tails" vs real unmatched ranges.
- * Some sort of InvalidCacheChunk could be placed to avoid them. TODO
- * @param base base offset for the ranges (stripe/stream offset in case of ORC).
- */
- DiskRangeList getFileData(long fileId, DiskRangeList range, long baseOffset,
- CacheChunkFactory factory, LowLevelCacheCounters qfCounters);
-
- DiskRangeList getFileData(
- long fileId, DiskRangeList range, long baseOffset, CacheChunkFactory factory);
-
- /**
- * Puts file data into cache.
- * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
- * the replacement chunks from cache are updated directly in the array.
- */
- long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] chunks,
- long base, Priority priority, LowLevelCacheCounters qfCounters);
-
- long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] chunks,
- long base, Priority priority);
-
- Allocator getAllocator();
-
- /**
- * Releases the buffer returned by getFileData or allocateMultiple.
- */
- void releaseBuffer(LlapMemoryBuffer buffer);
-
- void releaseBuffers(List<LlapMemoryBuffer> cacheBuffers);
-
- LlapMemoryBuffer createUnallocated();
-
- boolean notifyReused(LlapMemoryBuffer buffer);
-
- public interface CacheChunkFactory {
- DiskRangeList createCacheChunk(LlapMemoryBuffer buffer, long startOffset, long endOffset);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java
deleted file mode 100644
index 4dae98f..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.api.orc;
-
-public class OrcBatchKey {
- public long file;
- public int stripeIx, rgIx;
-
- public OrcBatchKey(long file, int stripeIx, int rgIx) {
- set(file, stripeIx, rgIx);
- }
-
- public void set(long file, int stripeIx, int rgIx) {
- this.file = file;
- this.stripeIx = stripeIx;
- this.rgIx = rgIx;
- }
-
- @Override
- public String toString() {
- return "[" + file + ", stripe " + stripeIx + ", rgIx " + rgIx + "]";
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = prime + (int)(file ^ (file >>> 32));
- return (prime * result + rgIx) * prime + stripeIx;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (!(obj instanceof OrcBatchKey)) return false;
- OrcBatchKey other = (OrcBatchKey)obj;
- // Strings are interned and can thus be compared like this.
- return stripeIx == other.stripeIx && rgIx == other.rgIx && file == other.file;
- }
-
- @Override
- public OrcBatchKey clone() throws CloneNotSupportedException {
- return new OrcBatchKey(file, stripeIx, rgIx);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcCacheKey.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcCacheKey.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcCacheKey.java
deleted file mode 100644
index 95efeba..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcCacheKey.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.api.orc;
-
-public class OrcCacheKey extends OrcBatchKey {
- public int colIx;
-
- public OrcCacheKey(long file, int stripeIx, int rgIx, int colIx) {
- super(file, stripeIx, rgIx);
- this.colIx = colIx;
- }
-
- public OrcCacheKey(OrcBatchKey batchKey, int colIx) {
- super(batchKey.file, batchKey.stripeIx, batchKey.rgIx);
- this.colIx = colIx;
- }
-
- public OrcBatchKey copyToPureBatchKey() {
- return new OrcBatchKey(file, stripeIx, rgIx);
- }
-
- @Override
- public String toString() {
- return "[" + file + ", stripe " + stripeIx + ", rgIx " + rgIx + ", rgIx " + colIx + "]";
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- return super.hashCode() * prime + colIx;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (!(obj instanceof OrcCacheKey)) return false;
- OrcCacheKey other = (OrcCacheKey)obj;
- // Strings are interned and can thus be compared like this.
- return stripeIx == other.stripeIx && rgIx == other.rgIx
- && file == other.file && other.colIx == colIx;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index fca6249..65854fc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -22,9 +22,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
@@ -97,8 +97,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
// TODO: would it make sense to return buffers asynchronously?
@Override
- public void allocateMultiple(LlapMemoryBuffer[] dest, int size)
- throws LlapCacheOutOfMemoryException {
+ public void allocateMultiple(MemoryBuffer[] dest, int size)
+ throws AllocatorOutOfMemoryException {
assert size > 0 : "size is " + size;
if (size > maxAllocation) {
throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation);
@@ -114,7 +114,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
int ix = 0;
for (int i = 0; i < dest.length; ++i) {
if (dest[i] != null) continue;
- dest[i] = new LlapDataBuffer(); // TODO: pool of objects?
+ dest[i] = createUnallocated(); // TODO: pool of objects?
}
// First try to quickly lock some of the correct-sized free lists and allocate from them.
int arenaCount = allocatedArenas.get();
@@ -173,20 +173,20 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
String msg = "Failed to allocate " + size + "; at " + ix + " out of " + dest.length;
LlapIoImpl.LOG.error(msg + "\nALLOCATOR STATE:\n" + debugDump()
+ "\nPARENT STATE:\n" + memoryManager.debugDumpForOom());
- throw new LlapCacheOutOfMemoryException(msg);
+ throw new AllocatorOutOfMemoryException(msg);
}
@Override
- public void deallocate(LlapMemoryBuffer buffer) {
+ public void deallocate(MemoryBuffer buffer) {
deallocateInternal(buffer, true);
}
@Override
- public void deallocateEvicted(LlapMemoryBuffer buffer) {
+ public void deallocateEvicted(MemoryBuffer buffer) {
deallocateInternal(buffer, false);
}
- private void deallocateInternal(LlapMemoryBuffer buffer, boolean doReleaseMemory) {
+ private void deallocateInternal(MemoryBuffer buffer, boolean doReleaseMemory) {
LlapDataBuffer buf = (LlapDataBuffer)buffer;
long memUsage = buf.getMemoryUsage();
metrics.decrCacheCapacityUsed(buf.byteBuffer.capacity());
@@ -303,7 +303,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
private int allocateFast(
- int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+ int arenaIx, int freeListIx, MemoryBuffer[] dest, int ix, int size) {
if (data == null) return -1; // not allocated yet
FreeList freeList = freeLists[freeListIx];
if (!freeList.lock.tryLock()) return ix;
@@ -315,7 +315,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
private int allocateWithSplit(int arenaIx, int freeListIx,
- LlapMemoryBuffer[] dest, int ix, int allocationSize) {
+ MemoryBuffer[] dest, int ix, int allocationSize) {
if (data == null) return -1; // not allocated yet
FreeList freeList = freeLists[freeListIx];
int remaining = -1;
@@ -404,7 +404,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
private int allocateWithExpand(
- int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+ int arenaIx, int freeListIx, MemoryBuffer[] dest, int ix, int size) {
while (true) {
int arenaCount = allocatedArenas.get(), allocArenaCount = arenaCount;
if (arenaCount < 0) {
@@ -449,7 +449,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
public int allocateFromFreeListUnderLock(int arenaIx, FreeList freeList,
- int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+ int freeListIx, MemoryBuffer[] dest, int ix, int size) {
int current = freeList.listHead;
while (current >= 0 && ix < dest.length) {
int offset = offsetFromHeaderIndex(current);
@@ -532,9 +532,6 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
}
- private static class Request {
-
- }
private static class FreeList {
ReentrantLock lock = new ReentrantLock(false);
int listHead = -1; // Index of where the buffer is; in minAllocation units
@@ -542,4 +539,9 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
// blocks requested, to be able to wait for pending splits and reduce fragmentation.
// However, we are trying to increase fragmentation now, since we cater to single-size.
}
+
+ @Override
+ public MemoryBuffer createUnallocated() {
+ return new LlapDataBuffer();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
index 1fe339c..4d294b9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
/** Dummy interface for now, might be different. */
public interface Cache<CacheKey> {
- public StreamBuffer[] cacheOrGet(CacheKey key, StreamBuffer[] value);
- public StreamBuffer[] get(CacheKey key);
+ public ColumnStreamData[] cacheOrGet(CacheKey key, ColumnStreamData[] value);
+ public ColumnStreamData[] get(CacheKey key);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionAwareAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionAwareAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionAwareAllocator.java
index db08130..3baacfd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionAwareAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionAwareAllocator.java
@@ -17,7 +17,8 @@
*/
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.common.io.storage_api.Allocator;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
/**
* An allocator that has additional, internal-only call to deallocate evicted buffer.
@@ -25,5 +26,5 @@ import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
* ourselves, so we set the value atomically to account for both eviction and the new demand.
*/
public interface EvictionAwareAllocator extends Allocator {
- void deallocateEvicted(LlapMemoryBuffer buffer);
+ void deallocateEvicted(MemoryBuffer buffer);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index c8ea475..83eb0af 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -21,13 +21,13 @@ package org.apache.hadoop.hive.llap.cache;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import com.google.common.annotations.VisibleForTesting;
-public final class LlapDataBuffer extends LlapCacheableBuffer implements LlapMemoryBuffer {
+public final class LlapDataBuffer extends LlapCacheableBuffer implements MemoryBuffer {
// For now, we don't track refcount for metadata blocks, don't clear them, don't reuse them and
// basically rely on GC to remove them. So, refcount only applies to data blocks. If that
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
new file mode 100644
index 0000000..13944ff
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.storage_api.Allocator;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
+
+public interface LowLevelCache {
+ public enum Priority {
+ NORMAL,
+ HIGH
+ }
+
+ /**
+ * Gets file data for particular offsets. The range list is modified in place; it is then
+ * returned (since the list head could have changed). Ranges are replaced with cached ranges.
+ * In case of partial overlap with cached data, full cache blocks are always returned;
+ * there's no capacity for partial matches in return type. The rules are as follows:
+ * 1) If the requested range starts in the middle of a cached range, that cached range will not
+ * be returned by default (e.g. if [100,200) and [200,300) are cached, the request for
+ * [150,300) will only return [200,300) from cache). This may be configurable in impls.
+ * This is because we assume well-known range start offsets are used (rg/stripe offsets), so
+ * a request from the middle of the start doesn't make sense.
+ * 2) If the requested range ends in the middle of a cached range, that entire cached range will
+ * be returned (e.g. if [100,200) and [200,300) are cached, the request for [100,250) will
+ * return both ranges). It should really be same as #1, however currently ORC uses estimated
+ * end offsets; we do in fact know in such cases that partially-matched cached block (rg)
+ * can be thrown away, the reader will never touch it; but we need code in the reader to
+ * handle such cases to avoid disk reads for these "tails" vs real unmatched ranges.
+ * Some sort of InvalidCacheChunk could be placed to avoid them. TODO
+ * @param base base offset for the ranges (stripe/stream offset in case of ORC).
+ */
+ DiskRangeList getFileData(long fileId, DiskRangeList range, long baseOffset,
+ DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData);
+
+ /**
+ * Puts file data into cache.
+ * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
+ * the replacement chunks from cache are updated directly in the array.
+ */
+ long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] chunks,
+ long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
+
+ Allocator getAllocator();
+
+ /**
+ * Releases the buffer returned by getFileData.
+ */
+ void releaseBuffer(MemoryBuffer buffer);
+
+ void releaseBuffers(List<MemoryBuffer> cacheBuffers);
+
+ boolean reuseBuffer(MemoryBuffer buffer);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
new file mode 100644
index 0000000..a0810f0
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+public interface LowLevelCacheCounters {
+ void recordCacheHit(long bytesHit);
+ void recordCacheMiss(long bytesMissed);
+ void recordAllocBytes(long bytesWasted, long bytesAllocated);
+ void recordHdfsTime(long timeUs);
+ long startTimeCounter();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 3798aaa..6a54623 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -27,11 +27,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.common.DiskRangeList;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
+import org.apache.hadoop.hive.common.DiskRangeList.MutateHelper;
+import org.apache.hadoop.hive.common.io.storage_api.Allocator;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
@@ -74,14 +75,8 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public DiskRangeList getFileData(
- long fileId, DiskRangeList ranges, long baseOffset, CacheChunkFactory factory) {
- return getFileData(fileId, ranges, baseOffset, factory, null);
- }
-
- @Override
public DiskRangeList getFileData(long fileId, DiskRangeList ranges, long baseOffset,
- CacheChunkFactory factory, LowLevelCacheCounters qfCounters) {
+ DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
if (ranges == null) return null;
DiskRangeList prev = ranges.prev;
FileCache subCache = cache.get(fileId);
@@ -91,25 +86,24 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
if (qfCounters != null) {
qfCounters.recordCacheMiss(totalMissed);
}
- if (prev != null && prev instanceof CacheListHelper) {
- ((CacheListHelper)prev).didGetAllData = false;
+ if (prev != null && gotAllData != null) {
+ gotAllData.value = false;
}
return ranges;
}
- CacheListHelper resultObj = null;
try {
if (prev == null) {
- prev = new DiskRangeListMutateHelper(ranges);
- } else if (prev instanceof CacheListHelper) {
- resultObj = (CacheListHelper)prev;
- resultObj.didGetAllData = true;
+ prev = new MutateHelper(ranges);
+ }
+ if (gotAllData != null) {
+ gotAllData.value = true;
}
DiskRangeList current = ranges;
while (current != null) {
metrics.incrCacheRequestedBytes(current.getLength());
// We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance.
DiskRangeList next = current.next;
- getOverlappingRanges(baseOffset, current, subCache.cache, factory, resultObj);
+ getOverlappingRanges(baseOffset, current, subCache.cache, factory, gotAllData);
current = next;
}
} finally {
@@ -134,8 +128,8 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCached,
- ConcurrentSkipListMap<Long, LlapDataBuffer> cache, CacheChunkFactory factory,
- CacheListHelper resultObj) {
+ ConcurrentSkipListMap<Long, LlapDataBuffer> cache, DiskRangeListFactory factory,
+ BooleanRef gotAllData) {
long absOffset = currentNotCached.getOffset() + baseOffset;
if (!doAssumeGranularBlocks) {
// This currently only happens in tests. See getFileData comment on the interface.
@@ -161,8 +155,8 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
if (!lockBuffer(buffer, true)) {
// If we cannot lock, remove this from cache and continue.
matches.remove();
- if (resultObj != null) {
- resultObj.didGetAllData = false;
+ if (gotAllData != null) {
+ gotAllData.value = false;
}
continue;
}
@@ -174,13 +168,13 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
cacheEnd = cacheOffset + buffer.declaredCachedLength;
DiskRangeList currentCached = factory.createCacheChunk(buffer,
cacheOffset - baseOffset, cacheEnd - baseOffset);
- currentNotCached = addCachedBufferToIter(currentNotCached, currentCached, resultObj);
+ currentNotCached = addCachedBufferToIter(currentNotCached, currentCached, gotAllData);
metrics.incrCacheHitBytes(Math.min(requestedLength, currentCached.getLength()));
}
if (currentNotCached != null) {
assert !currentNotCached.hasData();
- if (resultObj != null) {
- resultObj.didGetAllData = false;
+ if (gotAllData != null) {
+ gotAllData.value = false;
}
}
}
@@ -193,7 +187,7 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
* @return The new currentNotCached pointer, following the cached buffer insertion.
*/
private DiskRangeList addCachedBufferToIter(
- DiskRangeList currentNotCached, DiskRangeList currentCached, CacheListHelper resultObj) {
+ DiskRangeList currentNotCached, DiskRangeList currentCached, BooleanRef gotAllData) {
if (currentNotCached.getOffset() >= currentCached.getOffset()) {
if (currentNotCached.getEnd() <= currentCached.getEnd()) { // we assume it's always "==" now
// Replace the entire current DiskRange with new cached range.
@@ -206,8 +200,8 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
} else {
// There's some part of current buffer that is not cached.
- if (resultObj != null) {
- resultObj.didGetAllData = false;
+ if (gotAllData != null) {
+ gotAllData.value = false;
}
assert currentNotCached.getOffset() < currentCached.getOffset()
|| currentNotCached.prev == null
@@ -238,13 +232,7 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public long[] putFileData(long fileId, DiskRange[] ranges,
- LlapMemoryBuffer[] buffers, long baseOffset, Priority priority) {
- return putFileData(fileId, ranges, buffers, baseOffset, priority, null);
- }
-
- @Override
- public long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] buffers,
+ public long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] buffers,
long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
long[] result = null;
assert buffers.length == ranges.length;
@@ -346,13 +334,13 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public void releaseBuffer(LlapMemoryBuffer buffer) {
+ public void releaseBuffer(MemoryBuffer buffer) {
unlockBuffer((LlapDataBuffer)buffer, true);
}
@Override
- public void releaseBuffers(List<LlapMemoryBuffer> cacheBuffers) {
- for (LlapMemoryBuffer b : cacheBuffers) {
+ public void releaseBuffers(List<MemoryBuffer> cacheBuffers) {
+ for (MemoryBuffer b : cacheBuffers) {
unlockBuffer((LlapDataBuffer)b, true);
}
}
@@ -514,12 +502,7 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public LlapMemoryBuffer createUnallocated() {
- return new LlapDataBuffer();
- }
-
- @Override
- public boolean notifyReused(LlapMemoryBuffer buffer) {
+ public boolean reuseBuffer(MemoryBuffer buffer) {
// notifyReused implies that buffer is already locked; it's also called once for new
// buffers that are not cached yet. Don't notify cache policy.
return lockBuffer(((LlapDataBuffer)buffer), false);
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
index 0b50749..acbaf85 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
public interface LowLevelCachePolicy extends LlapOomDebugDump {
void cache(LlapCacheableBuffer buffer, Priority priority);
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
index a1ed7ea..9de159c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
@@ -28,7 +28,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index b43b31d..f551edb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
index f422911..6cd0c4a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
@@ -18,16 +18,16 @@
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
public class NoopCache<CacheKey> implements Cache<CacheKey> {
@Override
- public StreamBuffer[] cacheOrGet(CacheKey key, StreamBuffer[] value) {
+ public ColumnStreamData[] cacheOrGet(CacheKey key, ColumnStreamData[] value) {
return value;
}
@Override
- public StreamBuffer[] get(CacheKey key) {
+ public ColumnStreamData[] get(CacheKey key) {
return null; // TODO: ensure real implementation increases refcount
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
index 5d2915f..5d16f72 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLongArray;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.cache.LowLevelCacheCounters;
/**
* Per query counters.
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index e90dbbc..152230c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
-import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -36,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 40daeec..063eb08 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -26,6 +26,7 @@ import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.storage_api.Allocator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LogLevels;
import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
@@ -39,8 +40,6 @@ import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
import org.apache.hadoop.hive.llap.cache.NoopCache;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
-import org.apache.hadoop.hive.llap.cache.Allocator;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
@@ -48,6 +47,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcCacheKey;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.metrics2.util.MBeans;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
index f561270..79d3b32 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hive.llap.io.decode;
import java.util.List;
-import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.mapred.InputSplit;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 4c61463..b1d34ec 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -22,11 +22,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
-import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
import org.apache.hive.common.util.FixedSizedObjectPool;
import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
@@ -80,10 +80,10 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
synchronized (pendingData) {
localIsStopped = isStopped;
if (!localIsStopped) {
- targetBatch = pendingData.get(data.batchKey);
+ targetBatch = pendingData.get(data.getBatchKey());
if (targetBatch == null) {
targetBatch = data;
- pendingData.put(data.batchKey, data);
+ pendingData.put(data.getBatchKey(), data);
}
// We have the map locked; the code the throws things away from map only bumps the version
// under the same map lock; code the throws things away here only bumps the version when
@@ -102,7 +102,7 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
throw new UnsupportedOperationException("Merging is not supported");
}
synchronized (pendingData) {
- targetBatch = isStopped ? null : pendingData.remove(data.batchKey);
+ targetBatch = isStopped ? null : pendingData.remove(data.getBatchKey());
// Check if someone already threw this away and changed the version.
localIsStopped = (targetBatchVersion != targetBatch.version);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 7db60e0..765ade3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -22,18 +22,18 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.cache.Cache;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcCacheKey;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.mapred.InputSplit;