You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/27 22:48:43 UTC
[2/4] hive git commit: HIVE-11259 : LLAP: clean up ORC dependencies
part 1 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
index e66955d..e9c32ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -26,58 +28,60 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.common.DiskRangeList;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
-import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.CacheListHelper;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
-import org.apache.hadoop.hive.ql.io.orc.InStream.TrackedCacheChunk;
+import org.apache.hadoop.hive.common.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache;
+import org.apache.hadoop.hive.common.io.storage_api.DataReader;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
import org.apache.hive.common.util.FixedSizedObjectPool;
import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Encoded reader implementation.
*
* Note about refcounts on cache blocks.
- * When we get or put blocks into cache, they are "locked" (refcount++). After that, we send the
- * blocks out to processor as part of RG data; one block can be used for multiple RGs. In some
- * cases, one block is sent for ALL rgs (e.g. a dictionary for string column). This is how we deal
- * with this:
- * For non-dictionary case:
- * 1) At all times, every buffer has +1 refcount for each time we sent this block to processing.
- * 2) When processor is done with an RG, it decrefs for all the blocks involved.
- * 3) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we send
- * the block to processor, and the latter decrefs it, the block won't be evicted when we want
- * to reuse it for some other RG, forcing us to do an extra disk read or cache lookup.
- * 4) As we read (we always read RGs in order, assume they are stored in physical order in the
- * file, plus that CBs are not shared between streams, AND that we read each stream from the
- * beginning), we note which blocks cannot possibly be reused anymore (next RG starts in the
- * next CB). We decref for the refcount from (3) in such case.
- * 5) Given that RG end boundary in ORC is an estimate, so we can request data from cache and then
- * not use it, at the end we go thru all the blocks, and release those not released by (4).
+ * When we get or put blocks into cache, they are "locked" (refcount++), so they cannot be evicted.
+ * We send the MemoryBuffer-s to caller as part of RG data; one MemoryBuffer can be used for many
+ * RGs (e.g. a dictionary, or multiple RGs per block). Also, we want to "unlock" MemoryBuffer-s in
+ * cache as soon as possible. This is how we deal with this:
+ *
* For dictionary case:
- * 1) We have a separate refcount on the ColumnBuffer object we send to the processor (in the
- * non-dictionary case, it's always 1, so when processor is done it goes directly to decrefing
- * cache buffers).
- * 2) In the dictionary case, it's increased per RG, and processors don't touch cache buffers if
- * they do not happen to decref this refcount to 0.
- * 3) This is done because dictionary can have many buffers; decrefing all of them for all RGs
- * is more expensive; plus, decrefing in cache may be more expensive due to cache policy/etc.
+ * 1) There's a separate refcount on the StreamBuffer object we send to the caller. In the
+ * dictionary case, it's increased per RG, and callers don't release MBs if the containing
+ * StreamBuffer is not ready to be released. This is done because dictionary can have many
+ * buffers; decrefing all of them for all RGs is more expensive; plus, decrefing in cache
+ * may be more expensive due to cache policy/etc.
+ *
+ * For non-dictionary case:
+ * 1) All the StreamBuffer-s for normal data always have refcount 1, because we return then once.
+ * 2) At all times, every MB in such cases has +1 refcount for each time we return it as part of SB.
+ * 3) When caller is done, it therefore decrefs SB to 0, and decrefs all the MBs involved.
+ * 4) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we return
+ * the MB to caller, and he decrefs it, the MB can't be evicted and will be there if we want to
+ * reuse it for some other RG.
+ * 5) As we read (we always read RGs in order and forward in each stream; we assume they are stored
+ * physically in order in the file; AND that CBs are not shared between streams), we note which
+ * MBs cannot possibly be reused anymore (next RG starts in the next CB). We decref the refcount
+ * from (4) in such case.
+ * 6) Given that RG end boundaries in ORC are estimates, we can request data from cache and then
+ * not use it; thus, at the end we go thru all the MBs, and release those not released by (5).
*/
public class EncodedReaderImpl implements EncodedReader {
public static final Log LOG = LogFactory.getLog(EncodedReaderImpl.class);
@@ -114,52 +118,67 @@ public class EncodedReaderImpl implements EncodedReader {
t.reset();
}
});
- public static final FixedSizedObjectPool<StreamBuffer> SB_POOL =
- new FixedSizedObjectPool<>(8192, new PoolObjectHelper<StreamBuffer>() {
+ public static final FixedSizedObjectPool<ColumnStreamData> SB_POOL =
+ new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
+ @Override
+ protected ColumnStreamData create() {
+ return new ColumnStreamData();
+ }
+ @Override
+ protected void resetBeforeOffer(ColumnStreamData t) {
+ t.reset();
+ }
+ });
+ private static final FixedSizedObjectPool<CacheChunk> TCC_POOL =
+ new FixedSizedObjectPool<>(1024, new PoolObjectHelper<CacheChunk>() {
+ @Override
+ protected CacheChunk create() {
+ return new CacheChunk();
+ }
+ @Override
+ protected void resetBeforeOffer(CacheChunk t) {
+ t.reset();
+ }
+ });
+ private static final FixedSizedObjectPool<ProcCacheChunk> PCC_POOL =
+ new FixedSizedObjectPool<>(1024, new PoolObjectHelper<ProcCacheChunk>() {
@Override
- protected StreamBuffer create() {
- return new StreamBuffer();
+ protected ProcCacheChunk create() {
+ return new ProcCacheChunk();
}
@Override
- protected void resetBeforeOffer(StreamBuffer t) {
+ protected void resetBeforeOffer(ProcCacheChunk t) {
t.reset();
}
});
+ private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
+ @Override
+ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
+ CacheChunk tcc = TCC_POOL.take();
+ tcc.init(buffer, offset, end);
+ return tcc;
+ }
+ };
private final long fileId;
- private FSDataInputStream file;
+ private final DataReader dataReader;
+ private boolean isDataReaderOpen = false;
private final CompressionCodec codec;
private final int bufferSize;
private final List<OrcProto.Type> types;
- private ZeroCopyReaderShim zcr;
private final long rowIndexStride;
- private final LowLevelCache cache;
+ private final DataCache cache;
private ByteBufferAllocatorPool pool;
- // For now, one consumer for all calls.
- private final Consumer<OrcEncodedColumnBatch> consumer;
- private final LowLevelCacheCounters qfCounters;
- private final FileSystem fs;
- private final Path path;
- private final boolean useZeroCopy;
-
- public EncodedReaderImpl(FileSystem fileSystem, Path path, long fileId, boolean useZeroCopy,
- List<OrcProto.Type> types, CompressionCodec codec, int bufferSize, long strideRate,
- LowLevelCache cache, LowLevelCacheCounters qfCounters,
- Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
+ private boolean isDebugTracingEnabled;
+
+ public EncodedReaderImpl(long fileId, List<OrcProto.Type> types, CompressionCodec codec,
+ int bufferSize, long strideRate, DataCache cache, DataReader dataReader) throws IOException {
this.fileId = fileId;
- this.fs = fileSystem;
- this.path = path;
this.codec = codec;
this.types = types;
this.bufferSize = bufferSize;
this.rowIndexStride = strideRate;
this.cache = cache;
- this.qfCounters = qfCounters;
- this.consumer = consumer;
- this.useZeroCopy = useZeroCopy;
- if (useZeroCopy && !cache.getAllocator().isDirectAlloc()) {
- throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache "
- + "buffers; either disable zero-copy or enable direct cache allocation");
- }
+ this.dataReader = dataReader;
}
/** Helper context for each column being read */
@@ -228,7 +247,7 @@ public class EncodedReaderImpl implements EncodedReader {
/** Iterators for the buffers; used to maintain position in per-rg reading. */
DiskRangeList bufferIter;
/** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
- StreamBuffer stripeLevelStream;
+ ColumnStreamData stripeLevelStream;
@Override
public String toString() {
@@ -249,17 +268,15 @@ public class EncodedReaderImpl implements EncodedReader {
} else {
batchKey.set(fileId, stripeIx, rgIx);
}
- if (columnIxs == null || columnCount != columnIxs.length) {
- columnIxs = new int[columnCount];
- columnData = new StreamBuffer[columnCount][];
- }
+ resetColumnArrays(columnCount);
}
}
@Override
public void readEncodedColumns(int stripeIx, StripeInformation stripe,
RowIndex[] indexes, List<ColumnEncoding> encodings, List<Stream> streamList,
- boolean[] included, boolean[][] colRgs) throws IOException {
+ boolean[] included, boolean[][] colRgs,
+ Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
// Note: for now we don't have to setError here, caller will setError if we throw.
// We are also not supposed to call setDone, since we are only part of the operation.
long stripeOffset = stripe.getOffset();
@@ -267,8 +284,8 @@ public class EncodedReaderImpl implements EncodedReader {
long offset = 0; // Stream offset in relation to the stripe.
// 1.1. Figure out which columns have a present stream
boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("The following columns have PRESENT streams: " + DebugUtils.toString(hasNull));
+ if (isDebugTracingEnabled) {
+ LOG.info("The following columns have PRESENT streams: " + arrayToString(hasNull));
}
// We assume stream list is sorted by column and that non-data
@@ -278,7 +295,7 @@ public class EncodedReaderImpl implements EncodedReader {
ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
boolean[] includedRgs = null;
boolean isCompressed = (codec != null);
- DiskRangeListCreateHelper listToRead = new DiskRangeListCreateHelper();
+ CreateHelper listToRead = new CreateHelper();
boolean hasIndexOnlyCols = false;
for (OrcProto.Stream stream : streamList) {
long length = stream.getLength();
@@ -288,7 +305,7 @@ public class EncodedReaderImpl implements EncodedReader {
// We have a stream for included column, but in future it might have no data streams.
// It's more like "has at least one column included that has an index stream".
hasIndexOnlyCols = hasIndexOnlyCols | included[colIx];
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Skipping stream: " + streamKind + " at " + offset + ", " + length);
}
offset += length;
@@ -302,7 +319,7 @@ public class EncodedReaderImpl implements EncodedReader {
includedRgs = colRgs[colRgIx];
ctx = colCtxs[colRgIx] = COLCTX_POOL.take();
ctx.init(colIx, encodings.get(colIx), indexes[colIx]);
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString());
}
} else {
@@ -312,13 +329,13 @@ public class EncodedReaderImpl implements EncodedReader {
int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(),
types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]);
ctx.addStream(offset, stream, indexIx);
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Adding stream for column " + colIx + ": " + streamKind + " at " + offset
+ ", " + length + ", index position " + indexIx);
}
if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) {
RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead, true);
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Will read whole stream " + streamKind + "; added to " + listToRead.getTail());
}
} else {
@@ -344,35 +361,23 @@ public class EncodedReaderImpl implements EncodedReader {
}
// 2. Now, read all of the ranges from cache or disk.
- CacheListHelper toRead = new CacheListHelper(listToRead.get());
- if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
- && LOG.isInfoEnabled()) {
+ DiskRangeList.MutateHelper toRead = new DiskRangeList.MutateHelper(listToRead.get());
+ if (isDebugTracingEnabled && LOG.isInfoEnabled()) {
LOG.info("Resulting disk ranges to read (file " + fileId + "): "
+ RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
- cache.getFileData(fileId, toRead.next, stripeOffset, InStream.CC_FACTORY, qfCounters);
- if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
- && LOG.isInfoEnabled()) {
+ BooleanRef result = new BooleanRef();
+ cache.getFileData(fileId, toRead.next, stripeOffset, CC_FACTORY, result);
+ if (isDebugTracingEnabled && LOG.isInfoEnabled()) {
LOG.info("Disk ranges after cache (file " + fileId + ", base offset " + stripeOffset
+ "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
- if (!toRead.didGetAllData) {
- long startTime = qfCounters.startTimeCounter();
- if (this.file == null) {
- this.file = fs.open(path);
- this.pool = useZeroCopy ? new ByteBufferAllocatorPool() : null;
- this.zcr = useZeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null;
- }
- // Force direct buffers if we will be decompressing to direct cache.
- RecordReaderUtils.readDiskRanges(
- file, zcr, stripeOffset, toRead.next, cache.getAllocator().isDirectAlloc());
- qfCounters.recordHdfsTime(startTime);
- if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
- && LOG.isInfoEnabled()) {
- LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + stripeOffset
- + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
+ if (!result.value) {
+ if (isDataReaderOpen) {
+ this.dataReader.open();
}
+ dataReader.readFileData(toRead.next, stripeOffset, cache.getAllocator().isDirectAlloc());
}
// 3. For uncompressed case, we need some special processing before read.
@@ -382,15 +387,14 @@ public class EncodedReaderImpl implements EncodedReader {
ColumnReadContext ctx = colCtxs[colIxMod];
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
- DiskRangeList newIter = InStream.preReadUncompressedStream(fileId, stripeOffset,
- iter, sctx.offset, sctx.offset + sctx.length, zcr, cache, qfCounters);
+ DiskRangeList newIter = preReadUncompressedStream(
+ stripeOffset, iter, sctx.offset, sctx.offset + sctx.length);
if (newIter != null) {
iter = newIter;
}
}
}
- if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
- && LOG.isInfoEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Disk ranges after pre-read (file " + fileId + ", base offset "
+ stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
@@ -418,10 +422,10 @@ public class EncodedReaderImpl implements EncodedReader {
ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount);
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
- StreamBuffer cb = null;
+ ColumnStreamData cb = null;
if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
// This stream is for entire stripe and needed for every RG; uncompress once and reuse.
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
+ " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
}
@@ -436,9 +440,9 @@ public class EncodedReaderImpl implements EncodedReader {
// For stripe-level streams we don't need the extra refcount on the block.
// See class comment about refcounts.
long unlockUntilCOffset = sctx.offset + sctx.length;
- DiskRangeList lastCached = InStream.readEncodedStream(fileId, stripeOffset, iter,
- sctx.offset, sctx.offset + sctx.length, zcr, codec, bufferSize, cache,
- sctx.stripeLevelStream, unlockUntilCOffset, sctx.offset, qfCounters);
+ DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
+ sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
+ unlockUntilCOffset, sctx.offset);
if (lastCached != null) {
iter = lastCached;
}
@@ -459,11 +463,11 @@ public class EncodedReaderImpl implements EncodedReader {
cb = createRgStreamBuffer(
rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, isCompressed);
boolean isStartOfStream = sctx.bufferIter == null;
- DiskRangeList lastCached = InStream.readEncodedStream(fileId, stripeOffset,
- (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, zcr, codec,
- bufferSize, cache, cb, unlockUntilCOffset, sctx.offset, qfCounters);
+ DiskRangeList lastCached = readEncodedStream(stripeOffset,
+ (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
+ unlockUntilCOffset, sctx.offset);
if (lastCached != null) {
- sctx.bufferIter = iter = lastCached; // Reset iter just to ensure it's valid
+ sctx.bufferIter = iter = lastCached;
}
}
ecb.setStreamData(colIxMod, streamIx, cb);
@@ -475,25 +479,35 @@ public class EncodedReaderImpl implements EncodedReader {
}
releaseContexts(colCtxs);
- if ((DebugUtils.isTraceOrcEnabled() || DebugUtils.isTraceRangesEnabled())
- && LOG.isInfoEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Disk ranges after preparing all the data "
+ RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
// Release the unreleased buffers. See class comment about refcounts.
releaseInitialRefcounts(toRead.next);
- InStream.releaseCacheChunksIntoObjectPool(toRead.next);
+ releaseCacheChunksIntoObjectPool(toRead.next);
+ }
+
+
+ private static String arrayToString(boolean[] a) {
+ StringBuilder b = new StringBuilder();
+ b.append('[');
+ for (int i = 0; i < a.length; ++i) {
+ b.append(a[i] ? "1" : "0");
+ }
+ b.append(']');
+ return b.toString();
}
- private StreamBuffer createRgStreamBuffer(int rgIx, boolean isLastRg,
+ private ColumnStreamData createRgStreamBuffer(int rgIx, boolean isLastRg,
int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean isCompressed) {
- StreamBuffer cb;
+ ColumnStreamData cb;
cb = SB_POOL.take();
cb.init(sctx.kind.getNumber());
cb.incRef();
- if (DebugUtils.isTraceOrcEnabled()) {
+ if (isDebugTracingEnabled) {
LOG.info("Getting data for column "+ colIx + " " + (isLastRg ? "last " : "")
+ "RG " + rgIx + " stream " + sctx.kind + " at " + sctx.offset + ", "
+ sctx.length + " index position " + sctx.streamIndexOffset + ": " +
@@ -521,31 +535,869 @@ public class EncodedReaderImpl implements EncodedReader {
while (current != null) {
DiskRangeList toFree = current;
current = current.next;
- if (!(toFree instanceof TrackedCacheChunk)) continue;
- TrackedCacheChunk cc = (TrackedCacheChunk)toFree;
- if (cc.isReleased) continue;
- LlapMemoryBuffer buffer = ((CacheChunk)toFree).buffer;
- if (DebugUtils.isTraceLockingEnabled()) {
- LOG.info("Unlocking " + buffer + " for the fetching thread at the end");
- }
+ if (!(toFree instanceof CacheChunk)) continue;
+ CacheChunk cc = (CacheChunk)toFree;
+ if (cc.getBuffer() == null) continue;
+ MemoryBuffer buffer = cc.getBuffer();
cache.releaseBuffer(buffer);
- cc.isReleased = true;
+ cc.setBuffer(null);
}
}
+ @Override
+ public void setDebugTracing(boolean isEnabled) {
+ this.isDebugTracingEnabled = isEnabled;
+ }
+
@Override
public void close() throws IOException {
- if (file != null) {
- try {
- file.close();
- } catch (IOException ex) {
- // Tez might have closed our filesystem. Log and ignore error.
- LOG.info("Failed to close file; ignoring: " + ex.getMessage());
- }
- }
+ dataReader.close();
if (pool != null) {
pool.clear();
}
}
+
+ /** DiskRange containing encoded, uncompressed data from cache. */
+ @VisibleForTesting
+ public static class CacheChunk extends DiskRangeList {
+ protected MemoryBuffer buffer;
+
+ public CacheChunk() {
+ super(-1, -1);
+ }
+
+ public void init(MemoryBuffer buffer, long offset, long end) {
+ this.buffer = buffer;
+ this.offset = offset;
+ this.end = end;
+ }
+
+ @Override
+ public boolean hasData() {
+ return buffer != null;
+ }
+
+ @Override
+ public ByteBuffer getData() {
+ // Callers duplicate the buffer, they have to for BufferChunk
+ return buffer.getByteBufferRaw();
+ }
+
+ @Override
+ public String toString() {
+ return "start: " + offset + " end: " + end + " cache buffer: " + getBuffer();
+ }
+
+ @Override
+ public DiskRange sliceAndShift(long offset, long end, long shiftBy) {
+ throw new UnsupportedOperationException("Cache chunk cannot be sliced - attempted ["
+ + this.offset + ", " + this.end + ") to [" + offset + ", " + end + ") ");
+ }
+
+ public MemoryBuffer getBuffer() {
+ return buffer;
+ }
+
+ public void setBuffer(MemoryBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public void handleCacheCollision(DataCache cache,
+ MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void reset() {
+ init(null, -1, -1);
+ }
+ }
+
+ /**
+ * Fake cache chunk used for uncompressed data. Used in preRead for uncompressed files.
+ * Makes assumptions about preRead code; for example, we add chunks here when they are
+ * already in the linked list, without unlinking. So, we record the start position in the
+ * original list, and then, when someone adds the next element, we merely increase the number
+ * of elements one has to traverse from that position to get the whole list.
+ */
+ private static class UncompressedCacheChunk extends CacheChunk {
+ private BufferChunk chunk;
+ private int count;
+
+ public UncompressedCacheChunk(BufferChunk bc) {
+ super();
+ init(null, bc.getOffset(), bc.getEnd());
+ chunk = bc;
+ count = 1;
+ }
+
+ public void addChunk(BufferChunk bc) {
+ assert bc.getOffset() == this.getEnd();
+ this.end = bc.getEnd();
+ ++count;
+ }
+
+ public BufferChunk getChunk() {
+ return chunk;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public void handleCacheCollision(DataCache cache,
+ MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
+ assert cacheBuffers == null;
+ // This is done at pre-read stage where there's nothing special w/refcounts. Just release.
+ cache.getAllocator().deallocate(getBuffer());
+ // Replace the buffer in our big range list, as well as in current results.
+ this.setBuffer(replacementBuffer);
+ }
+
+ public void clear() {
+ this.chunk = null;
+ this.count = -1;
+ }
+ }
+
+ /**
+ * CacheChunk that is pre-created for new cache data; initially, it contains an original disk
+ * buffer and an unallocated MemoryBuffer object. Before we expose it, the MB is allocated,
+ * the data is decompressed, and original compressed data is discarded. The chunk lives on in
+ * the DiskRange list created for the request, and everyone treats it like regular CacheChunk.
+ */
+ private static class ProcCacheChunk extends CacheChunk {
+ public void init(long cbStartOffset, long cbEndOffset, boolean isCompressed,
+ ByteBuffer originalData, MemoryBuffer targetBuffer, int originalCbIndex) {
+ super.init(targetBuffer, cbStartOffset, cbEndOffset);
+ this.isOriginalDataCompressed = isCompressed;
+ this.originalData = originalData;
+ this.originalCbIndex = originalCbIndex;
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.originalData = null;
+ }
+
+ @Override
+ public void handleCacheCollision(DataCache cache, MemoryBuffer replacementBuffer,
+ List<MemoryBuffer> cacheBuffers) {
+ assert originalCbIndex >= 0;
+ // Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put,
+ // and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer
+ // is not in cache.
+ cache.getAllocator().deallocate(getBuffer());
+ cache.reuseBuffer(replacementBuffer);
+ // Replace the buffer in our big range list, as well as in current results.
+ this.buffer = replacementBuffer;
+ cacheBuffers.set(originalCbIndex, replacementBuffer);
+ originalCbIndex = -1; // This can only happen once at decompress time.
+ }
+
+ /** Original data that will be turned into encoded cache data in this.buffer and reset. */
+ private ByteBuffer originalData = null;
+ /** Whether originalData is compressed. */
+ private boolean isOriginalDataCompressed;
+ /** Index of the MemoryBuffer corresponding to this object inside the result list. If we
+ * hit a cache collision, we will replace this memory buffer with the one from cache at
+ * this index, without having to look for it. */
+ private int originalCbIndex;
+ }
+
+ /**
+ * Uncompresses part of the stream. RGs can overlap, so we cannot just go and decompress
+ * and remove what we have returned. We will keep iterator as a "hint" point.
+ * @param baseOffset Absolute offset of boundaries and ranges relative to file, for cache keys.
+ * @param start Ordered ranges containing file data. Helpful if they point close to cOffset.
+ * @param cOffset Start offset to decompress.
+ * @param endCOffset End offset to decompress; estimate, partial CBs will be ignored.
+ * @param streamBuffer Stream buffer, to add the results.
+ * @param unlockUntilCOffset The offset until which the buffers can be unlocked in cache, as
+ * they will not be used in future calls (see the class comment in
+ * EncodedReaderImpl about refcounts).
+ * @return Last buffer cached during decompression. Cache buffers are never removed from
+ * the master list, so they are safe to keep as iterators for various streams.
+ */
+ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, long cOffset,
+ long endCOffset, ColumnStreamData streamBuffer, long unlockUntilCOffset, long streamOffset)
+ throws IOException {
+ if (streamBuffer.getCacheBuffers() == null) {
+ streamBuffer.setCacheBuffers(new ArrayList<MemoryBuffer>());
+ } else {
+ streamBuffer.getCacheBuffers().clear();
+ }
+ if (cOffset == endCOffset) return null;
+ boolean isCompressed = codec != null;
+ List<ProcCacheChunk> toDecompress = null;
+ List<ByteBuffer> toRelease = null;
+ if (isCompressed) {
+ toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>();
+ toDecompress = new ArrayList<ProcCacheChunk>();
+ }
+
+ // 1. Find our bearings in the stream. Normally, iter will already point either to where we
+ // want to be, or just before. However, RGs can overlap due to encoding, so we may have
+ // to return to a previous block.
+ DiskRangeList current = findExactPosition(start, cOffset);
+ if (isDebugTracingEnabled) {
+ LOG.info("Starting read for [" + cOffset + "," + endCOffset + ") at " + current);
+ }
+
+ CacheChunk lastUncompressed = null;
+
+ // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
+ lastUncompressed = isCompressed ?
+ prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
+ unlockUntilCOffset, current, streamBuffer, toRelease, toDecompress)
+ : prepareRangesForUncompressedRead(
+ cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, streamBuffer);
+
+ // 3. Allocate the buffers, prepare cache keys.
+ // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
+ // data and some unallocated membufs for decompression. toDecompress contains all the work we
+ // need to do, and each item points to one of the membufs in cacheBuffers as target. The iter
+ // has also been adjusted to point to these buffers instead of compressed data for the ranges.
+ if (toDecompress == null) return lastUncompressed; // Nothing to decompress.
+
+ MemoryBuffer[] targetBuffers = new MemoryBuffer[toDecompress.size()];
+ DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
+ int ix = 0;
+ for (ProcCacheChunk chunk : toDecompress) {
+ cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
+ targetBuffers[ix] = chunk.getBuffer();
+ ++ix;
+ }
+ cache.getAllocator().allocateMultiple(targetBuffers, bufferSize);
+
+ // 4. Now decompress (or copy) the data into cache buffers.
+ for (ProcCacheChunk chunk : toDecompress) {
+ ByteBuffer dest = chunk.getBuffer().getByteBufferRaw();
+ if (chunk.isOriginalDataCompressed) {
+ decompressChunk(chunk.originalData, codec, dest);
+ } else {
+ copyUncompressedChunk(chunk.originalData, dest);
+ }
+
+ chunk.originalData = null;
+ if (isDebugTracingEnabled) {
+ LOG.info("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
+ }
+ cache.reuseBuffer(chunk.getBuffer());
+ }
+
+ // 5. Release original compressed buffers to zero-copy reader if needed.
+ if (toRelease != null) {
+ assert dataReader.isTrackingDiskRanges();
+ for (ByteBuffer buffer : toRelease) {
+ dataReader.releaseBuffer(buffer);
+ }
+ }
+
+ // 6. Finally, put uncompressed data to cache.
+ long[] collisionMask = cache.putFileData(fileId, cacheKeys, targetBuffers, baseOffset);
+ processCacheCollisions(collisionMask, toDecompress, targetBuffers, streamBuffer.getCacheBuffers());
+
+ // 7. It may happen that we know we won't use some compression buffers anymore.
+ // Release initial refcounts.
+ for (ProcCacheChunk chunk : toDecompress) {
+ ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk);
+ }
+
+ return lastUncompressed;
+ }
+
+ private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
+ long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData streamBuffer,
+ List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress) throws IOException {
+ if (cOffset > current.getOffset()) {
+ // Target compression block is in the middle of the range; slice the range in two.
+ current = current.split(cOffset).next;
+ }
+ long currentOffset = cOffset;
+ CacheChunk lastUncompressed = null;
+ while (true) {
+ DiskRangeList next = null;
+ if (current instanceof CacheChunk) {
+ // 2a. This is a decoded compression buffer, add as is.
+ CacheChunk cc = (CacheChunk)current;
+ if (isDebugTracingEnabled) {
+ LOG.info("Locking " + cc.getBuffer() + " due to reuse");
+ }
+ cache.reuseBuffer(cc.getBuffer());
+ streamBuffer.getCacheBuffers().add(cc.getBuffer());
+ currentOffset = cc.getEnd();
+ if (isDebugTracingEnabled) {
+ LOG.info("Adding an already-uncompressed buffer " + cc.getBuffer());
+ }
+ ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, cc);
+ lastUncompressed = cc;
+ next = current.next;
+ } else if (current instanceof IncompleteCb) {
+ // 2b. This is a known incomplete CB caused by ORC CB end boundaries being estimates.
+ if (isDebugTracingEnabled) {
+ LOG.info("Cannot read " + current);
+ }
+ next = null;
+ currentOffset = -1;
+ } else {
+ // 2c. This is a compressed buffer. We need to uncompress it; the buffer can comprise
+ // several disk ranges, so we might need to combine them.
+ BufferChunk bc = (BufferChunk)current;
+ ProcCacheChunk newCached = addOneCompressionBuffer(
+ bc, streamBuffer.getCacheBuffers(), toDecompress, toRelease);
+ lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
+ next = (newCached != null) ? newCached.next : null;
+ currentOffset = (next != null) ? next.getOffset() : -1;
+ }
+
+ if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
+ break;
+ }
+ current = next;
+ }
+ return lastUncompressed;
+ }
+
+ private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
+ long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData streamBuffer)
+ throws IOException {
+ long currentOffset = cOffset;
+ CacheChunk lastUncompressed = null;
+ boolean isFirst = true;
+ while (true) {
+ DiskRangeList next = null;
+ assert current instanceof CacheChunk;
+ lastUncompressed = (CacheChunk)current;
+ if (isDebugTracingEnabled) {
+ LOG.info("Locking " + lastUncompressed.getBuffer() + " due to reuse");
+ }
+ cache.reuseBuffer(lastUncompressed.getBuffer());
+ if (isFirst) {
+ streamBuffer.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset));
+ isFirst = false;
+ }
+ streamBuffer.getCacheBuffers().add(lastUncompressed.getBuffer());
+ currentOffset = lastUncompressed.getEnd();
+ if (isDebugTracingEnabled) {
+ LOG.info("Adding an uncompressed buffer " + lastUncompressed.getBuffer());
+ }
+ ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, lastUncompressed);
+ next = current.next;
+ if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
+ break;
+ }
+ current = next;
+ }
+ return lastUncompressed;
+ }
+
+ /**
+ * To achieve some sort of consistent cache boundaries, we will cache streams deterministically;
+ * in segments starting w/stream start, and going for either stream size or maximum allocation.
+ * If we are not reading the entire segment's worth of data, then we will not cache the partial
+ * RGs; the breakage of cache assumptions (no interleaving blocks, etc.) is way too much PITA
+ * to handle just for this case.
+ * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
+ * allocator. Uncompressed case is not mainline though so let's not complicate it.
+ * @param qfCounters
+ */
+ private DiskRangeList preReadUncompressedStream(long baseOffset,
+ DiskRangeList start, long streamOffset, long streamEnd) throws IOException {
+ if (streamOffset == streamEnd) return null;
+ List<UncompressedCacheChunk> toCache = null;
+ List<ByteBuffer> toRelease = null;
+
+ // 1. Find our bearings in the stream.
+ DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd);
+ if (isDebugTracingEnabled) {
+ LOG.info("Starting pre-read for [" + streamOffset + "," + streamEnd + ") at " + current);
+ }
+
+ if (streamOffset > current.getOffset()) {
+ // Target compression block is in the middle of the range; slice the range in two.
+ current = current.split(streamOffset).next;
+ }
+ // Account for maximum cache buffer size.
+ long streamLen = streamEnd - streamOffset;
+ int partSize = cache.getAllocator().getMaxAllocation(),
+ partCount = (int)((streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0));
+ long partOffset = streamOffset, partEnd = Math.min(partOffset + partSize, streamEnd);
+
+ CacheChunk lastUncompressed = null;
+ MemoryBuffer[] singleAlloc = new MemoryBuffer[1];
+ for (int i = 0; i < partCount; ++i) {
+ long hasEntirePartTo = -1;
+ if (partOffset == current.getOffset()) {
+ hasEntirePartTo = partOffset;
+ // We assume cache chunks would always match the way we read, so check and skip it.
+ if (current instanceof CacheChunk) {
+ lastUncompressed = (CacheChunk)current;
+ assert current.getOffset() == partOffset && current.getEnd() == partEnd;
+ partOffset = partEnd;
+ partEnd = Math.min(partOffset + partSize, streamEnd);
+ continue;
+ }
+ }
+ if (current.getOffset() >= partEnd) {
+ // We have no data at all for this part of the stream (could be unneeded), skip.
+ partOffset = partEnd;
+ partEnd = Math.min(partOffset + partSize, streamEnd);
+ continue;
+ }
+ if (toRelease == null && dataReader.isTrackingDiskRanges()) {
+ toRelease = new ArrayList<ByteBuffer>();
+ }
+ // We have some disk buffers... see if we have entire part, etc.
+ UncompressedCacheChunk candidateCached = null;
+ DiskRangeList next = current;
+ while (true) {
+ if (next == null || next.getOffset() >= partEnd) {
+ if (hasEntirePartTo < partEnd && candidateCached != null) {
+ // We are missing a section at the end of the part...
+ lastUncompressed = copyAndReplaceCandidateToNonCached(
+ candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc);
+ candidateCached = null;
+ }
+ break;
+ }
+ current = next;
+ boolean wasSplit = (current.getEnd() > partEnd);
+ if (wasSplit) {
+ current = current.split(partEnd);
+ }
+ if (isDebugTracingEnabled) {
+ LOG.info("Processing uncompressed file data at ["
+ + current.getOffset() + ", " + current.getEnd() + ")");
+ }
+ BufferChunk bc = (BufferChunk)current;
+ if (!wasSplit && toRelease != null) {
+ toRelease.add(bc.chunk); // TODO: is it valid to give zcr the modified 2nd part?
+ }
+
+ // Track if we still have the entire part.
+ long hadEntirePartTo = hasEntirePartTo;
+ if (hasEntirePartTo != -1) {
+ hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1;
+ }
+ if (candidateCached != null && hasEntirePartTo == -1) {
+ lastUncompressed = copyAndReplaceCandidateToNonCached(
+ candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc);
+ candidateCached = null;
+ }
+
+ if (hasEntirePartTo != -1) {
+ // So far we have all the data from the beginning of the part.
+ if (candidateCached == null) {
+ candidateCached = new UncompressedCacheChunk(bc);
+ } else {
+ candidateCached.addChunk(bc);
+ }
+ // We will take care of this at the end of the part, or if we find a gap.
+ next = current.next;
+ continue;
+ }
+ // We don't have the entire part; just copy to an allocated buffer. We could try to
+ // optimize a bit if we have contiguous buffers with gaps, but it's probably not needed.
+ lastUncompressed = copyAndReplaceUncompressedToNonCached(bc, cache, singleAlloc);
+ next = lastUncompressed.next;
+ }
+ if (candidateCached != null) {
+ if (toCache == null) {
+ toCache = new ArrayList<>(partCount - i);
+ }
+ toCache.add(candidateCached);
+ }
+ }
+
+ // 3. Allocate the buffers, prepare cache keys.
+ if (toCache == null) return lastUncompressed; // Nothing to copy and cache.
+
+ MemoryBuffer[] targetBuffers =
+ toCache.size() == 1 ? singleAlloc : new MemoryBuffer[toCache.size()];
+ targetBuffers[0] = null;
+ DiskRange[] cacheKeys = new DiskRange[toCache.size()];
+ int ix = 0;
+ for (UncompressedCacheChunk chunk : toCache) {
+ cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
+ ++ix;
+ }
+ cache.getAllocator().allocateMultiple(
+ targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
+
+ // 4. Now copy the data into cache buffers.
+ ix = 0;
+ for (UncompressedCacheChunk candidateCached : toCache) {
+ candidateCached.setBuffer(targetBuffers[ix]);
+ ByteBuffer dest = candidateCached.getBuffer().getByteBufferRaw();
+ copyAndReplaceUncompressedChunks(candidateCached, dest, candidateCached);
+ candidateCached.clear();
+ lastUncompressed = candidateCached;
+ ++ix;
+ }
+
+ // 5. Release original compressed buffers to zero-copy reader if needed.
+ if (toRelease != null) {
+ assert dataReader.isTrackingDiskRanges();
+ for (ByteBuffer buf : toRelease) {
+ dataReader.releaseBuffer(buf);
+ }
+ }
+
+ // 6. Finally, put uncompressed data to cache.
+ long[] collisionMask = cache.putFileData(fileId, cacheKeys, targetBuffers, baseOffset);
+ processCacheCollisions(collisionMask, toCache, targetBuffers, null);
+
+ return lastUncompressed;
+ }
+
+ private static void copyUncompressedChunk(ByteBuffer src, ByteBuffer dest) {
+ int startPos = dest.position(), startLim = dest.limit();
+ dest.put(src); // Copy uncompressed data to cache.
+ // Put moves position forward by the size of the data.
+ int newPos = dest.position();
+ if (newPos > startLim) {
+ throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
+ + ") became [" + newPos + ", " + dest.limit() + ")");
+ }
+ dest.position(startPos);
+ dest.limit(newPos);
+ }
+
+
+ private static CacheChunk copyAndReplaceCandidateToNonCached(
+ UncompressedCacheChunk candidateCached, long partOffset,
+ long candidateEnd, DataCache cache, MemoryBuffer[] singleAlloc) {
+ // We thought we had the entire part to cache, but we don't; convert start to
+ // non-cached. Since we are at the first gap, the previous stuff must be contiguous.
+ singleAlloc[0] = null;
+ cache.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
+
+ MemoryBuffer buffer = singleAlloc[0];
+ cache.reuseBuffer(buffer);
+ ByteBuffer dest = buffer.getByteBufferRaw();
+ CacheChunk tcc = TCC_POOL.take();
+ tcc.init(buffer, partOffset, candidateEnd);
+ copyAndReplaceUncompressedChunks(candidateCached, dest, tcc);
+ return tcc;
+ }
+
+ private static CacheChunk copyAndReplaceUncompressedToNonCached(
+ BufferChunk bc, DataCache cache, MemoryBuffer[] singleAlloc) {
+ singleAlloc[0] = null;
+ cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
+ MemoryBuffer buffer = singleAlloc[0];
+ cache.reuseBuffer(buffer);
+ ByteBuffer dest = buffer.getByteBufferRaw();
+ CacheChunk tcc = TCC_POOL.take();
+ tcc.init(buffer, bc.getOffset(), bc.getEnd());
+ copyUncompressedChunk(bc.chunk, dest);
+ bc.replaceSelfWith(tcc);
+ return tcc;
+ }
+
+ private static void copyAndReplaceUncompressedChunks(
+ UncompressedCacheChunk candidateCached, ByteBuffer dest, CacheChunk tcc) {
+ int startPos = dest.position(), startLim = dest.limit();
+ BufferChunk chunk = candidateCached.getChunk();
+ for (int i = 0; i < candidateCached.getCount(); ++i) {
+ dest.put(chunk.getData());
+ BufferChunk next = (BufferChunk)(chunk.next);
+ if (i == 0) {
+ chunk.replaceSelfWith(tcc);
+ } else {
+ chunk.removeSelf();
+ }
+ chunk = next;
+ }
+ int newPos = dest.position();
+ if (newPos > startLim) {
+ throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
+ + ") became [" + newPos + ", " + dest.limit() + ")");
+ }
+ dest.position(startPos);
+ dest.limit(newPos);
+ }
+
+ private static void decompressChunk(
+ ByteBuffer src, CompressionCodec codec, ByteBuffer dest) throws IOException {
+ int startPos = dest.position(), startLim = dest.limit();
+ codec.decompress(src, dest);
+ // Codec resets the position to 0 and limit to correct limit.
+ dest.position(startPos);
+ int newLim = dest.limit();
+ if (newLim > startLim) {
+ throw new AssertionError("After codec, buffer [" + startPos + ", " + startLim
+ + ") became [" + dest.position() + ", " + newLim + ")");
+ }
+ }
+
+ public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) {
+ while (current != null) {
+ if (current instanceof ProcCacheChunk) {
+ PCC_POOL.offer((ProcCacheChunk)current);
+ } else if (current instanceof CacheChunk) {
+ TCC_POOL.offer((CacheChunk)current);
+ }
+ current = current.next;
+ }
+ }
+
+ private void ponderReleaseInitialRefcount(
+ long unlockUntilCOffset, long streamStartOffset, CacheChunk cc) {
+ if (cc.getEnd() > unlockUntilCOffset) return;
+ assert cc.getBuffer() != null;
+ releaseInitialRefcount(cc, false);
+ // Release all the previous buffers that we may not have been able to release due to reuse.
+ DiskRangeList prev = cc.prev;
+ while (true) {
+ if ((prev == null) || (prev.getEnd() <= streamStartOffset)
+ || !(prev instanceof CacheChunk)) break;
+ CacheChunk prevCc = (CacheChunk)prev;
+ if (prevCc.buffer == null) break;
+ releaseInitialRefcount(prevCc, true);
+ prev = prev.prev;
+ }
+ }
+
+ private void releaseInitialRefcount(CacheChunk cc, boolean isBacktracking) {
+ // This is the last RG for which this buffer will be used. Remove the initial refcount
+ if (isDebugTracingEnabled) {
+ LOG.info("Unlocking " + cc.getBuffer() + " for the fetching thread"
+ + (isBacktracking ? "; backtracking" : ""));
+ }
+ cache.releaseBuffer(cc.getBuffer());
+ cc.setBuffer(null);
+ }
+
+ private void processCacheCollisions(long[] collisionMask,
+ List<? extends CacheChunk> toDecompress, MemoryBuffer[] targetBuffers,
+ List<MemoryBuffer> cacheBuffers) {
+ if (collisionMask == null) return;
+ assert collisionMask.length >= (toDecompress.size() >>> 6);
+ // There are some elements that were cached in parallel, take care of them.
+ long maskVal = -1;
+ for (int i = 0; i < toDecompress.size(); ++i) {
+ if ((i & 63) == 0) {
+ maskVal = collisionMask[i >>> 6];
+ }
+ if ((maskVal & 1) == 1) {
+ // Cache has found an old buffer for the key and put it into array instead of our new one.
+ CacheChunk replacedChunk = toDecompress.get(i);
+ MemoryBuffer replacementBuffer = targetBuffers[i];
+ if (isDebugTracingEnabled) {
+ LOG.info("Discarding data due to cache collision: " + replacedChunk.getBuffer()
+ + " replaced with " + replacementBuffer);
+ }
+ assert replacedChunk.getBuffer() != replacementBuffer : i + " was not replaced in the results "
+ + "even though mask is [" + Long.toBinaryString(maskVal) + "]";
+ replacedChunk.handleCacheCollision(cache, replacementBuffer, cacheBuffers);
+ }
+ maskVal >>= 1;
+ }
+ }
+
+
+ /** Finds compressed offset in a stream and makes sure iter points to its position.
+ This may be necessary for obscure combinations of compression and encoding boundaries. */
+ private static DiskRangeList findExactPosition(DiskRangeList ranges, long offset) {
+ if (offset < 0) return ranges;
+ return findIntersectingPosition(ranges, offset, offset);
+ }
+
+ private static DiskRangeList findIntersectingPosition(DiskRangeList ranges, long offset, long end) {
+ if (offset < 0) return ranges;
+ // We expect the offset to be valid TODO: rather, validate
+ while (ranges.getEnd() <= offset) {
+ ranges = ranges.next;
+ }
+ while (ranges.getOffset() > end) {
+ ranges = ranges.prev;
+ }
+ // We are now on some intersecting buffer, find the first intersecting buffer.
+ while (ranges.prev != null && ranges.prev.getEnd() > offset) {
+ ranges = ranges.prev;
+ }
+ return ranges;
+ }
+
+ private static class IncompleteCb extends DiskRangeList {
+ public IncompleteCb(long offset, long end) {
+ super(offset, end);
+ }
+
+ @Override
+ public String toString() {
+ return "incomplete CB start: " + offset + " end: " + end;
+ }
+ }
+
+ /**
+ * Reads one compression block from the source; handles compression blocks read from
+ * multiple ranges (usually, that would only happen with zcr).
+ * Adds stuff to cachedBuffers, toDecompress and toRelease (see below what each does).
+ * @param current BufferChunk where compression block starts.
+ * @param ranges Iterator of all chunks, pointing at current.
+ * @param cacheBuffers The result buffer array to add pre-allocated target cache buffer.
+ * @param toDecompress The list of work to decompress - pairs of compressed buffers and the
+ * target buffers (same as the ones added to cacheBuffers).
+ * @param toRelease The list of buffers to release to zcr because they are no longer in use.
+ * @return The resulting cache chunk.
+ */
+ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
+ List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
+ List<ByteBuffer> toRelease) throws IOException {
+ ByteBuffer slice = null;
+ ByteBuffer compressed = current.chunk;
+ long cbStartOffset = current.getOffset();
+ int b0 = compressed.get() & 0xff;
+ int b1 = compressed.get() & 0xff;
+ int b2 = compressed.get() & 0xff;
+ int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+ if (chunkLength > bufferSize) {
+ throw new IllegalArgumentException("Buffer size too small. size = " +
+ bufferSize + " needed = " + chunkLength);
+ }
+ int consumedLength = chunkLength + OutStream.HEADER_SIZE;
+ long cbEndOffset = cbStartOffset + consumedLength;
+ boolean isUncompressed = ((b0 & 0x01) == 1);
+ if (isDebugTracingEnabled) {
+ LOG.info("Found CB at " + cbStartOffset + ", chunk length " + chunkLength + ", total "
+ + consumedLength + ", " + (isUncompressed ? "not " : "") + "compressed");
+ }
+ if (compressed.remaining() >= chunkLength) {
+ // Simple case - CB fits entirely in the disk range.
+ slice = compressed.slice();
+ slice.limit(chunkLength);
+ ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed,
+ cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
+ if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
+ toRelease.add(compressed);
+ }
+ return cc;
+ }
+ if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
+ addIncompleteCompressionBuffer(cbStartOffset, current, 0);
+ return null; // This is impossible to read from this chunk.
+ }
+
+ // TODO: we could remove extra copy for isUncompressed case by copying directly to cache.
+ // We need to consolidate 2 or more buffers into one to decompress.
+ ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+ int remaining = chunkLength - compressed.remaining();
+ int originalPos = compressed.position();
+ copy.put(compressed);
+ if (isDebugTracingEnabled) {
+ LOG.info("Removing partial CB " + current + " from ranges after copying its contents");
+ }
+ DiskRangeList next = current.next;
+ current.removeSelf();
+ if (dataReader.isTrackingDiskRanges()) {
+ if (originalPos == 0) {
+ dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+ } else {
+ toRelease.add(compressed); // There might be slices depending on this buffer.
+ }
+ }
+
+ int extraChunkCount = 0;
+ while (true) {
+ if (!(next instanceof BufferChunk)) {
+ throw new IOException("Trying to extend compressed block into uncompressed block " + next);
+ }
+ compressed = next.getData();
+ ++extraChunkCount;
+ if (compressed.remaining() >= remaining) {
+ // This is the last range for this compression block. Yay!
+ slice = compressed.slice();
+ slice.limit(remaining);
+ copy.put(slice);
+ ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed,
+ cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers);
+ if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
+ dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+ }
+ return cc;
+ }
+ remaining -= compressed.remaining();
+ copy.put(compressed);
+ if (dataReader.isTrackingDiskRanges()) {
+ dataReader.releaseBuffer(compressed); // We copied the entire buffer.
+ }
+ DiskRangeList tmp = next;
+ next = next.hasContiguousNext() ? next.next : null;
+ if (next != null) {
+ if (isDebugTracingEnabled) {
+ LOG.info("Removing partial CB " + tmp + " from ranges after copying its contents");
+ }
+ tmp.removeSelf();
+ } else {
+ addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount);
+ return null; // This is impossible to read from this chunk.
+ }
+ }
+ }
+
+ private void addIncompleteCompressionBuffer(
+ long cbStartOffset, DiskRangeList target, int extraChunkCount) {
+ IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
+ if (isDebugTracingEnabled) {
+ LOG.info("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with "
+ + icb + " in the buffers");
+ }
+ target.replaceSelfWith(icb);
+ }
+
+ /**
+ * Add one buffer with compressed data the results for addOneCompressionBuffer (see javadoc).
+ * @param fullCompressionBlock (fCB) Entire compression block, sliced or copied from disk data.
+ * @param isUncompressed Whether the data in the block is uncompressed.
+ * @param cbStartOffset Compressed start offset of the fCB.
+ * @param cbEndOffset Compressed end offset of the fCB.
+ * @param lastRange The buffer from which the last (or all) bytes of fCB come.
+ * @param lastChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock.
+ * @param ranges The iterator of all compressed ranges for the stream, pointing at lastRange.
+ * @param lastChunk
+ * @param toDecompress See addOneCompressionBuffer.
+ * @param cacheBuffers See addOneCompressionBuffer.
+ * @return New cache buffer.
+ */
+ private ProcCacheChunk addOneCompressionBlockByteBuffer(ByteBuffer fullCompressionBlock,
+ boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastChunkLength,
+ BufferChunk lastChunk, List<ProcCacheChunk> toDecompress, List<MemoryBuffer> cacheBuffers) {
+ // Prepare future cache buffer.
+ MemoryBuffer futureAlloc = cache.getAllocator().createUnallocated();
+ // Add it to result in order we are processing.
+ cacheBuffers.add(futureAlloc);
+ // Add it to the list of work to decompress.
+ ProcCacheChunk cc = PCC_POOL.take();
+ cc.init(cbStartOffset, cbEndOffset, !isUncompressed,
+ fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1);
+ toDecompress.add(cc);
+ // Adjust the compression block position.
+ if (isDebugTracingEnabled) {
+ LOG.info("Adjusting " + lastChunk + " to consume " + lastChunkLength + " compressed bytes");
+ }
+ lastChunk.chunk.position(lastChunk.chunk.position() + lastChunkLength);
+ // Finally, put it in the ranges list for future use (if shared between RGs).
+ // Before anyone else accesses it, it would have been allocated and decompressed locally.
+ if (lastChunk.chunk.remaining() <= 0) {
+ if (isDebugTracingEnabled) {
+ LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers");
+ }
+ lastChunk.replaceSelfWith(cc);
+ } else {
+ if (isDebugTracingEnabled) {
+ LOG.info("Adding " + cc + " before " + lastChunk + " in the buffers");
+ }
+ lastChunk.insertPartBefore(cc);
+ }
+ return cc;
+ }
+
+ private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
+ return isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
index 9e2d281..c546e22 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedTreeReaderFactory.java
@@ -20,8 +20,9 @@ package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
/**
*
@@ -71,7 +72,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -88,9 +89,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
- private EncodedColumnBatch.StreamBuffer nanosStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
+ private ColumnStreamData nanosStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
private boolean skipCorrupt;
@@ -105,17 +106,17 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setSecondsStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setSecondsStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
- public StreamReaderBuilder setNanosStream(EncodedColumnBatch.StreamBuffer secondaryStream) {
+ public StreamReaderBuilder setNanosStream(ColumnStreamData secondaryStream) {
this.nanosStream = secondaryStream;
return this;
}
@@ -222,7 +223,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -251,10 +252,10 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
- private EncodedColumnBatch.StreamBuffer dictionaryStream;
- private EncodedColumnBatch.StreamBuffer lengthStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
+ private ColumnStreamData dictionaryStream;
+ private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -268,22 +269,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
- public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer lengthStream) {
+ public StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
this.lengthStream = lengthStream;
return this;
}
- public StreamReaderBuilder setDictionaryStream(EncodedColumnBatch.StreamBuffer dictStream) {
+ public StreamReaderBuilder setDictionaryStream(ColumnStreamData dictStream) {
this.dictionaryStream = dictStream;
return this;
}
@@ -360,7 +361,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -374,8 +375,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -389,12 +390,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -463,7 +464,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -477,8 +478,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
private boolean skipCorrupt;
@@ -493,12 +494,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -572,7 +573,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -586,8 +587,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -601,12 +602,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -675,7 +676,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -689,8 +690,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
public StreamReaderBuilder setFileId(Long fileId) {
@@ -703,12 +704,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -771,7 +772,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -785,8 +786,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
public StreamReaderBuilder setFileId(Long fileId) {
@@ -799,12 +800,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -878,7 +879,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -895,9 +896,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer valueStream;
- private EncodedColumnBatch.StreamBuffer scaleStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData valueStream;
+ private ColumnStreamData scaleStream;
private int scale;
private int precision;
private CompressionCodec compressionCodec;
@@ -923,17 +924,17 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setValueStream(EncodedColumnBatch.StreamBuffer valueStream) {
+ public StreamReaderBuilder setValueStream(ColumnStreamData valueStream) {
this.valueStream = valueStream;
return this;
}
- public StreamReaderBuilder setScaleStream(EncodedColumnBatch.StreamBuffer scaleStream) {
+ public StreamReaderBuilder setScaleStream(ColumnStreamData scaleStream) {
this.scaleStream = scaleStream;
return this;
}
@@ -1004,7 +1005,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1018,8 +1019,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -1033,12 +1034,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -1138,7 +1139,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1168,10 +1169,10 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private Long fileId;
private int columnIndex;
private int maxLength;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
- private EncodedColumnBatch.StreamBuffer dictionaryStream;
- private EncodedColumnBatch.StreamBuffer lengthStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
+ private ColumnStreamData dictionaryStream;
+ private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -1190,22 +1191,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
- public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer lengthStream) {
+ public StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
this.lengthStream = lengthStream;
return this;
}
- public StreamReaderBuilder setDictionaryStream(EncodedColumnBatch.StreamBuffer dictStream) {
+ public StreamReaderBuilder setDictionaryStream(ColumnStreamData dictStream) {
this.dictionaryStream = dictStream;
return this;
}
@@ -1312,7 +1313,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1342,10 +1343,10 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
private Long fileId;
private int columnIndex;
private int maxLength;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
- private EncodedColumnBatch.StreamBuffer dictionaryStream;
- private EncodedColumnBatch.StreamBuffer lengthStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
+ private ColumnStreamData dictionaryStream;
+ private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -1364,22 +1365,22 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
- public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer lengthStream) {
+ public StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) {
this.lengthStream = lengthStream;
return this;
}
- public StreamReaderBuilder setDictionaryStream(EncodedColumnBatch.StreamBuffer dictStream) {
+ public StreamReaderBuilder setDictionaryStream(ColumnStreamData dictStream) {
this.dictionaryStream = dictStream;
return this;
}
@@ -1455,7 +1456,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1469,8 +1470,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
public StreamReaderBuilder setFileId(Long fileId) {
@@ -1483,12 +1484,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -1561,7 +1562,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1578,9 +1579,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
- private EncodedColumnBatch.StreamBuffer lengthStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
+ private ColumnStreamData lengthStream;
private CompressionCodec compressionCodec;
private OrcProto.ColumnEncoding columnEncoding;
@@ -1594,17 +1595,17 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
- public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer secondaryStream) {
+ public StreamReaderBuilder setLengthStream(ColumnStreamData secondaryStream) {
this.lengthStream = secondaryStream;
return this;
}
@@ -1673,7 +1674,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
}
@Override
- public void setBuffers(EncodedColumnBatch.StreamBuffer[] buffers, boolean sameStripe)
+ public void setBuffers(ColumnStreamData[] buffers, boolean sameStripe)
throws IOException {
super.setBuffers(buffers, sameStripe);
if (_presentStream != null) {
@@ -1687,8 +1688,8 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
public static class StreamReaderBuilder {
private Long fileId;
private int columnIndex;
- private EncodedColumnBatch.StreamBuffer presentStream;
- private EncodedColumnBatch.StreamBuffer dataStream;
+ private ColumnStreamData presentStream;
+ private ColumnStreamData dataStream;
private CompressionCodec compressionCodec;
public StreamReaderBuilder setFileId(Long fileId) {
@@ -1701,12 +1702,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
return this;
}
- public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+ public StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) {
this.presentStream = presentStream;
return this;
}
- public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+ public StreamReaderBuilder setDataStream(ColumnStreamData dataStream) {
this.dataStream = dataStream;
return this;
}
@@ -1740,11 +1741,11 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
List<OrcProto.ColumnEncoding> encodings,
EncodedColumnBatch<OrcBatchKey> batch,
CompressionCodec codec, boolean skipCorrupt) throws IOException {
- long file = batch.batchKey.file;
+ long file = batch.getBatchKey().file;
TreeReader[] treeReaders = new TreeReader[numCols];
for (int i = 0; i < numCols; i++) {
- int columnIndex = batch.columnIxs[i];
- EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i];
+ int columnIndex = batch.getColumnIxs()[i];
+ ColumnStreamData[] streamBuffers = batch.getColumnData()[i];
OrcProto.Type columnType = types.get(columnIndex);
// EncodedColumnBatch is already decompressed, we don't really need to pass codec.
@@ -1756,13 +1757,13 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex);
// stream buffers are arranged in enum order of stream kind
- EncodedColumnBatch.StreamBuffer present = null;
- EncodedColumnBatch.StreamBuffer data = null;
- EncodedColumnBatch.StreamBuffer dictionary = null;
- EncodedColumnBatch.StreamBuffer lengths = null;
- EncodedColumnBatch.StreamBuffer secondary = null;
- for (EncodedColumnBatch.StreamBuffer streamBuffer : streamBuffers) {
- switch (streamBuffer.streamKind) {
+ ColumnStreamData present = null;
+ ColumnStreamData data = null;
+ ColumnStreamData dictionary = null;
+ ColumnStreamData lengths = null;
+ ColumnStreamData secondary = null;
+ for (ColumnStreamData streamBuffer : streamBuffers) {
+ switch (streamBuffer.getStreamKind()) {
case 0:
// PRESENT stream
present = streamBuffer;
@@ -1784,7 +1785,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory {
secondary = streamBuffer;
break;
default:
- throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind);
+ throw new IOException("Unexpected stream kind: " + streamBuffer.getStreamKind());
}
}