You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/09/16 19:50:04 UTC
svn commit: r1625344 [2/4] - in /hive/branches/llap: ./
common/src/java/org/apache/hadoop/hive/conf/ data/conf/ data/conf/tez/
itests/qtest/ llap-client/ llap-client/src/ llap-client/src/java/
llap-client/src/java/org/ llap-client/src/java/org/apache/ ...
Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkReader.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkReader.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkReader.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,568 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.chunk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.DoubleBuffer;
+import java.nio.LongBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.type.Decimal128;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.api.Vector;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkUtils;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+import org.apache.hadoop.hive.llap.chunk.ChunkUtils.RleSegmentType;
+
+/**
+ * Chunk reader; reads chained chunks (we might want to separate this later).
+ *
+ * Initial chunk format:
+ * [version byte][int number of rows][padding to 8 bytes](segments)(if string, [dictionary])
+ * Version is 0 for initial format. Segment format:
+ * [type byte][int number of rows][padding to 8 bytes](values).
+ * One value is stored for repeated segments, and none for null-repeated segments. For non-repeated
+ * segments, values and bitmasks are interspersed; N-byte bitmask followed is by N*8 values,
+ * repeated. Last bitmask may be smaller if there are less values, but it's still rounded to 8
+ * bytes. Values for nulls are still stored (we could save space by not storing them, like ORC).
+ *
+ * Values are stored with fixed-length and are 8 bytes for long and double; 8 bytes for strings
+ * (dictionary offset+length); TODO bytes for decimals. Values stored for NULLs are undefined.
+ */
+public class ChunkReader implements Vector.ColumnReader {
+ private final Type type;
+ private Chunk chunk;
+ private List<ByteBuffer> chunkBuffers;
+ private List<RleSegmentType> segmentTypes;
+ private List<Integer> segmentOffsets, segmentRowCounts;
+ private int rowOffsetInFirstSegment;
+
+ private int remainingRowsInLastChunk = 0, rowOffsetInNextSegment = 0, nextSegmentOffset = 0;
+
+ private int lastRowCountNeeded = 0;
+
+ public ChunkReader(Type type, Chunk chunk) {
+ this.type = type;
+ this.chunk = chunk;
+ // Verify chunk version.
+ byte firstByte = chunk.buffer.getContents().get(chunk.offset);
+ if (firstByte != ChunkUtils.FORMAT_VERSION) {
+ throw new UnsupportedOperationException("Chunk doesn't start as expected: " + firstByte);
+ }
+ if (Integer.bitCount(ChunkUtils.BITMASK_SIZE_BYTES) != 1
+ || (ChunkUtils.BITMASK_SIZE_BYTES < 8)) {
+ throw new AssertionError("Must be a power of two >= 8: "+ ChunkUtils.BITMASK_SIZE_BYTES);
+ }
+ }
+
+ public int getNumRowsRemaining() {
+ if (this.chunk == null) return 0;
+ int result = remainingRowsInLastChunk;
+ Chunk chunk = this.chunk;
+ if (remainingRowsInLastChunk > 0) {
+ chunk = chunk.nextChunk; // remainingRowsInLastChunk accounts for current one
+ }
+ while (chunk != null) {
+ ByteBuffer bb = chunk.buffer.getContents();
+ result += bb.getInt(chunk.offset + 1);
+ chunk = chunk.nextChunk;
+ }
+ return result;
+ }
+
+ public void next(int rowCountNeeded) {
+ lastRowCountNeeded = rowCountNeeded;
+ if (rowCountNeeded == 0) return;
+ ByteBuffer chunkBuffer = null;
+ if (chunkBuffers == null) {
+ init();
+ } else {
+ if (remainingRowsInLastChunk > 0) {
+ chunkBuffer = chunkBuffers.get(chunkBuffers.size() - 1);
+ }
+ reset();
+ }
+
+ while (chunk != null && rowCountNeeded > 0) {
+ if (chunkBuffer == null) {
+ chunkBuffer = createChunkBuffer(chunk);
+ remainingRowsInLastChunk = chunkBuffer.getInt(1); // skip header byte
+ nextSegmentOffset = 8;
+ }
+ while (rowCountNeeded > 0 && remainingRowsInLastChunk > 0) {
+ int segmentOffset = nextSegmentOffset;
+ assert (segmentOffset & 7) == 0; // must be 8-byte aligned
+ RleSegmentType segmentType = RleSegmentType.fromInt(chunkBuffer.get(nextSegmentOffset));
+ ++nextSegmentOffset;
+ int segmentRowCount = chunkBuffer.getInt(nextSegmentOffset);
+ int segmentRowOffset = rowOffsetInNextSegment; // Only non-zero for the first segment.
+ nextSegmentOffset += 7;
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("Segment type " + segmentType + " with " + segmentRowCount
+ + " rows (skipping " + segmentRowOffset + "); at " + nextSegmentOffset + "; "
+ + remainingRowsInLastChunk + " more in this chunk including this segment");
+ }
+ int dataLength = ChunkUtils.getSegmentDataSize(type, segmentType, segmentRowCount);
+ int segmentRowCountAvail = segmentRowCount - segmentRowOffset;
+ if (segmentRowCountAvail > rowCountNeeded) {
+ // We have some rows remaining in the same segment.
+ nextSegmentOffset = segmentOffset;
+ rowOffsetInNextSegment = segmentRowOffset + rowCountNeeded;
+ segmentRowCountAvail = rowCountNeeded;
+ } else {
+ nextSegmentOffset += dataLength;
+ rowOffsetInNextSegment = 0;
+ }
+ remainingRowsInLastChunk -= segmentRowCountAvail;
+ rowCountNeeded -= segmentRowCountAvail;
+
+ // Finally, add segment to data.
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("Adding segment with " + segmentRowCountAvail + " rows in "
+ + chunk.buffer + "; segment row offset " + segmentRowOffset);
+ }
+ chunkBuffers.add(chunkBuffer);
+ segmentTypes.add(segmentType);
+ segmentOffsets.add(segmentOffset);
+ segmentRowCounts.add(segmentRowCount);
+ if (rowOffsetInFirstSegment == -1) {
+ rowOffsetInFirstSegment = segmentRowOffset;
+ }
+ }
+ if (remainingRowsInLastChunk == 0) {
+ // We are done with current chunk.
+ chunk = chunk.nextChunk;
+ nextSegmentOffset = 0;
+ } else {
+ assert rowCountNeeded == 0;
+ }
+ chunkBuffer = null;
+ }
+ assert !chunkBuffers.isEmpty() : "No rows found, expected " + rowCountNeeded;
+ }
+
+ private void init() {
+ chunkBuffers = new ArrayList<ByteBuffer>();
+ segmentTypes = new ArrayList<RleSegmentType>();
+ segmentOffsets = new ArrayList<Integer>();
+ segmentRowCounts = new ArrayList<Integer>();
+ remainingRowsInLastChunk = 0;
+ rowOffsetInFirstSegment = -1;
+ }
+
+ private void reset() {
+ chunkBuffers.clear();
+ segmentRowCounts.clear();
+ segmentTypes.clear();
+ segmentOffsets.clear();
+ rowOffsetInFirstSegment = -1;
+ }
+
+ private ByteBuffer createChunkBuffer(Chunk chunk) {
+ ByteBuffer bb = chunk.buffer.getContents();
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("Chunk in " + chunk.buffer
+ + " at " + chunk.offset + ", length " + chunk.length);
+ }
+ bb.position(chunk.offset);
+ ByteBuffer chunkBuffer = bb.slice();
+ chunkBuffer.limit(chunk.length);
+ return chunkBuffer;
+ }
+
+ @Override
+ public boolean isSameValue() {
+ // Assume repeated values wouldn't be written as separate segments
+ if (segmentTypes.size() > 1) return false;
+ RleSegmentType type = segmentTypes.get(0);
+ return type == RleSegmentType.REPEATING_NULL || type == RleSegmentType.REPEATING_VALUE;
+ }
+
+ @Override
+ public boolean hasNulls() {
+ for (RleSegmentType type : segmentTypes) {
+ if (type == RleSegmentType.REPEATING_NULL || type == RleSegmentType.UNIQUE_NULL_BITMASK) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // TODO: Ideally we want to get rid of this and process segments as we go (nextLongs ...);
+ // however, this may preclude us from predicting whether there are nulls, etc.
+ // Plus code will be even more complex. But might be worth it.
+ @Override
+ public void copyLongs(long[] dest, boolean[] isNull, int destOffset) throws IOException {
+ int segmentRowOffset = rowOffsetInFirstSegment;
+ for (int segmentIx = 0; segmentIx < segmentTypes.size(); ++segmentIx) {
+ int segmentRowCount = segmentRowCounts.get(segmentIx);
+ RleSegmentType segmentType = segmentTypes.get(segmentIx);
+ ByteBuffer chunkBuffer = chunkBuffers.get(segmentIx);
+ int rowCountToRead = Math.min(segmentRowCount - segmentRowOffset, lastRowCountNeeded);
+ lastRowCountNeeded -= rowCountToRead;
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("Copying " + rowCountToRead + " rows from segment " + segmentIx
+ + " of type " + segmentType + " segment at " + segmentOffsets.get(segmentIx)
+ + " using row offset " + segmentRowOffset + " to result offset " + destOffset);
+ }
+ switch (segmentType) {
+ case REPEATING_NULL: {
+ assert isNull != null;
+ Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, true);
+ destOffset += rowCountToRead;
+ break;
+ }
+ case REPEATING_VALUE: {
+ long value = chunkBuffer.getLong(segmentOffsets.get(segmentIx) + 8);
+ Arrays.fill(dest, destOffset, destOffset + rowCountToRead, value);
+ if (isNull != null) {
+ Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, false);
+ }
+ destOffset += rowCountToRead;
+ break;
+ }
+ case UNIQUE_NOT_NULL: {
+ int dataOffset = segmentOffsets.get(segmentIx) + 8;
+ assert (dataOffset & 7) == 0; // Must be 8-byte aligned.
+ copyLongValues(chunkBuffer, dataOffset, segmentRowOffset,
+ dest, destOffset, rowCountToRead);
+ if (isNull != null) {
+ Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, false);
+ }
+ destOffset += rowCountToRead;
+ break;
+ }
+ case UNIQUE_NULL_BITMASK: {
+ longCopier.initDest(dest);
+ destOffset = copyValuesWithNulls(chunkBuffer, segmentOffsets.get(segmentIx),
+ segmentRowOffset, segmentRowCount, longCopier, isNull, destOffset, rowCountToRead);
+ break;
+ }
+ default: throw new IOException("Unsupported segment type " + segmentType);
+ }
+ segmentRowOffset = 0;
+ }
+ }
+
+ private void copyLongValues(ByteBuffer chunkBuffer, int dataOffset,
+ int segmentRowOffset, long[] dest, int destOffset, int rowCountToRead) {
+ LongBuffer longBuffer = chunkBuffer.asLongBuffer();
+ longBuffer.position((dataOffset >>> 3) + segmentRowOffset);
+ longBuffer.get(dest, destOffset, rowCountToRead);
+ if (DebugUtils.isTraceDataEnabled()) {
+ Llap.LOG.info("Copied " + rowCountToRead + " rows from long offset "
+ + ((dataOffset >>> 3) + segmentRowOffset) + " (" + dataOffset + ", "
+ + segmentRowOffset + "): " + DebugUtils.toString(dest, destOffset, rowCountToRead));
+ Llap.LOG.debug("VRB vector now looks like " + Arrays.toString(dest));
+ }
+ }
+
+ @Override
+ public void copyDoubles(double[] dest, boolean[] isNull, int destOffset) throws IOException {
+ int segmentRowOffset = rowOffsetInFirstSegment;
+ for (int segmentIx = 0; segmentIx < segmentTypes.size(); ++segmentIx) {
+ int segmentRowCount = segmentRowCounts.get(segmentIx);
+ RleSegmentType segmentType = segmentTypes.get(segmentIx);
+ ByteBuffer chunkBuffer = chunkBuffers.get(segmentIx);
+ int rowCountToRead = Math.min(segmentRowCount - segmentRowOffset, lastRowCountNeeded);
+ lastRowCountNeeded -= rowCountToRead;
+ switch (segmentType) {
+ case REPEATING_NULL: {
+ assert isNull != null;
+ Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, true);
+ destOffset += rowCountToRead;
+ break;
+ }
+ case REPEATING_VALUE: {
+ long value = chunkBuffer.getLong(segmentOffsets.get(segmentIx) + 8);
+ Arrays.fill(dest, destOffset, destOffset + rowCountToRead, value);
+ if (isNull != null) {
+ Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, false);
+ }
+ destOffset += rowCountToRead;
+ break;
+ }
+ case UNIQUE_NOT_NULL: {
+ int dataOffset = segmentOffsets.get(segmentIx) + 8;
+ assert (dataOffset & 7) == 0; // Must be 8-byte aligned.
+ copyDoubleValues(chunkBuffer, dataOffset, segmentRowOffset,
+ dest, destOffset, rowCountToRead);
+ if (isNull != null) {
+ Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, false);
+ }
+ destOffset += rowCountToRead;
+ break;
+ }
+ case UNIQUE_NULL_BITMASK: {
+ doubleCopier.initDest(dest);
+ destOffset = copyValuesWithNulls(chunkBuffer, segmentOffsets.get(segmentIx),
+ segmentRowOffset, segmentRowCount, doubleCopier, isNull, destOffset, rowCountToRead);
+ break;
+ }
+ default: throw new IOException("Unsupported segment type " + segmentType);
+ }
+ segmentRowOffset = 0;
+ }
+ }
+
+ private void copyDoubleValues(ByteBuffer chunkBuffer, int dataOffset,
+ int segmentRowOffset, double[] dest, int destOffset, int rowCountToRead) {
+ DoubleBuffer doubleBuffer = chunkBuffer.asDoubleBuffer();
+ doubleBuffer.position((dataOffset >>> 3) + segmentRowOffset);
+ doubleBuffer.get(dest, destOffset, rowCountToRead);
+ }
+
+ private int copyValuesWithNulls(ByteBuffer chunkBuffer, int segmentDataOffset,
+ int segmentRowOffset, int segmentRowCount, ValueCopier valueHelper, boolean[] isNull,
+ int destOffset, int rowCountToRead) {
+ if (rowCountToRead == 0) return destOffset;
+ int valueSize = ChunkUtils.TYPE_SIZES[type.value()];
+ // Prepare to read (or skip) the first bitmask.
+ int bitmasksSkipped = 0;
+ int currentBitmaskOffset = segmentDataOffset + 8;
+ int currentBitmaskSize = determineBitmaskSizeBytes(bitmasksSkipped, segmentRowCount);
+ // Size of bitmask and corresponding values in bytes, for a BITMASK_SIZE_BYTES-sized bitmask.
+ int sizeOfBitmaskAndValues = ChunkUtils.getFullBitmaskSize(valueSize);
+ valueHelper.initSrc(chunkBuffer);
+
+ // For the first segment, we might have to skip some rows. This is sadly most of this method.
+ if (segmentRowOffset > 0) {
+ // First, see how many full bitmasks we need to skip.
+ int bitmasksToSkip = (segmentRowOffset / ChunkUtils.BITMASK_SIZE_BITS);
+ assert bitmasksToSkip == 0 || currentBitmaskSize == ChunkUtils.BITMASK_SIZE_BYTES;
+ bitmasksSkipped += bitmasksToSkip;
+ currentBitmaskOffset += (bitmasksToSkip * sizeOfBitmaskAndValues);
+ currentBitmaskSize = determineBitmaskSizeBytes(bitmasksSkipped, segmentRowCount);
+ segmentRowOffset = segmentRowOffset % ChunkUtils.BITMASK_SIZE_BITS;
+
+ // Remember how many values we are skipping in the current bitmask, for value copying.
+ int valuesToSkip = segmentRowOffset;
+ // Then, in the bitmask we are in, skip however many full bytes we need to skip.
+ int currentOffsetInBitmask = 0;
+ if (segmentRowOffset >= 8) {
+ int bytesToSkip = segmentRowOffset >>> 3;
+ currentOffsetInBitmask = bytesToSkip;
+ segmentRowOffset = segmentRowOffset & 7;
+ }
+
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("Skipping " + bitmasksToSkip + " bitmasks and "
+ + currentOffsetInBitmask + " bytes; for a bitmask at " + currentBitmaskOffset
+ + " will skip " + valuesToSkip + " values and " + segmentRowOffset + " bits");
+ }
+ // Finally, we may need to skip some bits in the first byte we are reading.
+ // Read the partial byte of the bitmask (and corresponding long values).
+ if (segmentRowOffset > 0) {
+ int partialByteRowCount = Math.min(rowCountToRead, 8 - segmentRowOffset);
+ copyBitsFromByte(chunkBuffer.get(currentBitmaskOffset + currentOffsetInBitmask),
+ isNull, destOffset, segmentRowOffset, partialByteRowCount);
+ valueHelper.copyValues(destOffset,
+ currentBitmaskOffset + currentBitmaskSize, valuesToSkip, partialByteRowCount);
+ if (DebugUtils.isTraceDataEnabled()) {
+ Llap.LOG.info("After partial first byte w/" + partialByteRowCount
+ + ", byte was " + chunkBuffer.get(currentBitmaskOffset));
+ Llap.LOG.debug("After partial first byte w/" + partialByteRowCount
+ + ", booleans are " + DebugUtils.toString(isNull));
+ }
+
+ rowCountToRead -= partialByteRowCount;
+ destOffset += partialByteRowCount;
+ ++currentOffsetInBitmask;
+ if (currentOffsetInBitmask == ChunkUtils.BITMASK_SIZE_BYTES && rowCountToRead > 0) {
+ // We only needed part of the last byte from this bitmask, go to the next one.
+ ++bitmasksSkipped;
+ currentBitmaskOffset += sizeOfBitmaskAndValues;
+ currentBitmaskSize = determineBitmaskSizeBytes(bitmasksSkipped, segmentRowCount);
+ currentOffsetInBitmask = 0;
+ }
+ }
+
+ if (rowCountToRead == 0) return destOffset;
+
+ // Then, if we have a partial bitmask, get to the boundary.
+ if (currentOffsetInBitmask > 0) {
+ // First, copy the bits, then the values at the same destOffset.
+ for (int i = currentOffsetInBitmask, tmpToRead = rowCountToRead, tmpOffset = destOffset;
+ (i < currentBitmaskSize) && (tmpToRead > 0); ++i) {
+ int bitsToRead = Math.min(tmpToRead, 8);
+ copyBitsFromByte(chunkBuffer.get(currentBitmaskOffset + i),
+ isNull, tmpOffset, 0, bitsToRead);
+ if (DebugUtils.isTraceDataEnabled()) {
+ Llap.LOG.info("After copying " + bitsToRead +" bits from byte at "
+ + (currentBitmaskOffset + i) + " to " + tmpOffset + ", booleans are "
+ + DebugUtils.toString(isNull));
+ }
+ tmpOffset += bitsToRead;
+ tmpToRead -= bitsToRead;
+ }
+ valuesToSkip = currentOffsetInBitmask << 3;
+ int valuesToRead = Math.min((currentBitmaskSize << 3) - valuesToSkip, rowCountToRead);
+ valueHelper.copyValues(destOffset, currentBitmaskOffset + currentBitmaskSize,
+ valuesToSkip, valuesToRead);
+ destOffset += valuesToRead;
+ rowCountToRead -= valuesToRead;
+ if (rowCountToRead == 0) return destOffset;
+ // Go to next bitmask.
+ currentBitmaskOffset += sizeOfBitmaskAndValues;
+ ++bitmasksSkipped;
+ currentBitmaskSize = determineBitmaskSizeBytes(bitmasksSkipped, segmentRowCount);
+ }
+ } // end of the epic "segmentRowOffset > 0" if
+
+ // This is the main code path
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("After segment offset, reading " + rowCountToRead + " rows from data at "
+ + currentBitmaskOffset + " with " + segmentRowCount + " rows to offset " + destOffset
+ + "; bitmask size " + currentBitmaskSize);
+ }
+
+ // Now we are finally done with all the crooked offsets (if any) so we can just read the data.
+ while (true) {
+ // Read one bitmask and corresponding values.
+ for (int i = 0, tmpCountToRead = rowCountToRead, tmpOffset = destOffset;
+ i < currentBitmaskSize && tmpCountToRead > 0; ++i) {
+ byte b = chunkBuffer.get(currentBitmaskOffset + i);
+ int bitsToRead = Math.min(tmpCountToRead, 8);
+ copyBitsFromByte(b & 0xff, isNull, tmpOffset, 0, bitsToRead);
+ if (DebugUtils.isTraceDataEnabled()) {
+ Llap.LOG.info("Copied " + bitsToRead + " bits from " + b + " ("
+ + Integer.toBinaryString(b & 0xff) + ") at " + (currentBitmaskOffset + i)
+ + " to " + tmpOffset + "; current state is " + DebugUtils.toString(isNull));
+ }
+ tmpOffset += bitsToRead;
+ tmpCountToRead -= bitsToRead;
+ }
+ int valuesToRead = Math.min(currentBitmaskSize << 3, rowCountToRead);
+ int valuesOffset = currentBitmaskOffset + currentBitmaskSize;
+ valueHelper.copyValues(destOffset, valuesOffset, 0, valuesToRead);
+ destOffset += valuesToRead;
+ rowCountToRead -= valuesToRead;
+ if (rowCountToRead == 0) break;
+
+ ++bitmasksSkipped;
+ currentBitmaskOffset += sizeOfBitmaskAndValues;
+ currentBitmaskSize = determineBitmaskSizeBytes(bitmasksSkipped, segmentRowCount);
+ }
+ return destOffset;
+ }
+
+ /** Helper interface to share the parts that deal with bitmasks, esp.
+ * the insane offset logic, between method copying various datatypes. */
+ private interface ValueCopier {
+ void initSrc(ByteBuffer chunkBuffer);
+
+ void copyValues(int destOffset,
+ int valuesOffsetBytes, int valuesToSkip, int valuesToCopy);
+ }
+
+ private static class LongCopier implements ValueCopier {
+ LongBuffer dataBuffer = null;
+ long[] dest;
+ public void initDest(long[] dest) {
+ this.dest = dest;
+ }
+ public void initSrc(ByteBuffer chunkBuffer) {
+ dataBuffer = chunkBuffer.asLongBuffer();
+ }
+
+ public void copyValues(int destOffset,
+ int valuesOffsetBytes, int valuesToSkip, int valuesToCopy) {
+ dataBuffer.position((valuesOffsetBytes >>> 3) + valuesToSkip);
+ dataBuffer.get(dest, destOffset, valuesToCopy);
+ if (DebugUtils.isTraceDataEnabled()) {
+ Llap.LOG.info("After copying " + valuesToCopy + " from " + valuesOffsetBytes +
+ " (skip " + valuesToSkip + ", long offset "+ ((valuesOffsetBytes >>> 3) + valuesToSkip)
+ + ") to " + destOffset + ", values are " + DebugUtils.toString(dest, destOffset,
+ valuesToCopy) + " and dest is " + DebugUtils.toString(dest, 0, dest.length));
+ }
+ }
+ }
+
+ private static class DoubleCopier implements ValueCopier {
+ DoubleBuffer dataBuffer = null;
+ double[] dest;
+ public void initDest(double[] dest) {
+ this.dest = dest;
+ }
+ public void initSrc(ByteBuffer chunkBuffer) {
+ dataBuffer = chunkBuffer.asDoubleBuffer();
+ }
+
+ public void copyValues(int destOffset,
+ int valuesOffsetBytes, int valuesToSkip, int valuesToCopy) {
+ valuesOffsetBytes += (valuesToSkip << 3);
+ dataBuffer.position(valuesOffsetBytes >>> 3);
+ dataBuffer.get(dest, destOffset, valuesToCopy);
+ }
+ }
+ private LongCopier longCopier = new LongCopier();
+ private DoubleCopier doubleCopier = new DoubleCopier();
+
+ private int determineBitmaskSizeBytes(int skipped, int segmentRowCount) {
+ int adjustedRowCount = segmentRowCount - ChunkUtils.BITMASK_SIZE_BITS * skipped;
+ if (adjustedRowCount >= ChunkUtils.BITMASK_SIZE_BITS) return ChunkUtils.BITMASK_SIZE_BYTES;
+ return ChunkUtils.align8((adjustedRowCount >>> 3) + (((adjustedRowCount & 7) != 0) ? 1 : 0));
+ }
+
+ @Override
+ public long getLong() {
+ return chunkBuffers.get(0).getLong(segmentOffsets.get(0) + 8);
+ }
+
+ @Override
+ public double getDouble() {
+ return chunkBuffers.get(0).getDouble(segmentOffsets.get(0) + 8);
+ }
+
+ private void copyBitsFromByte(int b, boolean[] dest, int offset, int skipBits, int bitCount) {
+ // TODO: we could unroll the loop for full-byte copy.
+ int shift = 7 - skipBits;
+ for (int i = 0; i < bitCount; ++i, --shift) {
+ dest[offset++] = (b & (1 << shift)) != 0;
+ }
+ }
+
+ // TODO: add support for Decimal and Binary
+
+ @Override
+ public Decimal128 getDecimal() {
+ throw new UnsupportedOperationException("Decimal not currently supported");
+ }
+
+ @Override
+ public void copyDecimals(Decimal128[] dest, boolean[] isNull, int offset) {
+ throw new UnsupportedOperationException("Decimal not currently supported");
+ }
+
+ @Override
+ public byte[] getBytes() {
+ throw new UnsupportedOperationException("Binary not currently supported");
+ }
+
+ @Override
+ public void copyBytes(byte[][] dest, int[] destStarts, int[] destLengths,
+ boolean[] isNull, int offset) {
+ throw new UnsupportedOperationException("Binary not currently supported");
+ }
+}
Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkUtils.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkUtils.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkUtils.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.chunk;
+
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+
+/** Shared and utility methods for ChunkReader and ChunkWriter. */
+public class ChunkUtils {
+ public final static int BITMASK_SIZE_BYTES = 8;
+ public final static int BITMASK_SIZE_BITS = BITMASK_SIZE_BYTES * 8;
+
+ public static final byte FORMAT_VERSION = 0;
+ public static final byte[] TYPE_SIZES = new byte[Type.BINARY.value() + 1];
+ static {
+ TYPE_SIZES[Type.BINARY.value()] = -1; // TODO: add support for binary
+ TYPE_SIZES[Type.DECIMAL.value()] = -1; // TODO: add support for decimal
+ TYPE_SIZES[Type.LONG.value()] = 8;
+ TYPE_SIZES[Type.DOUBLE.value()] = 8;
+ }
+
+ public static enum RleSegmentType {
+ INVALID(0),
+ REPEATING_NULL(1),
+ REPEATING_VALUE(2),
+ UNIQUE_NOT_NULL(3),
+ UNIQUE_NULL_BITMASK(4);
+
+ private byte value;
+ private RleSegmentType(int val) {
+ assert val >= Byte.MIN_VALUE && val <= Byte.MAX_VALUE;
+ this.value = (byte)val;
+ }
+ private static final RleSegmentType[] ints = new RleSegmentType[UNIQUE_NULL_BITMASK.value + 1];
+ static {
+ for (RleSegmentType type : RleSegmentType.values()) {
+ ints[type.value] = type;
+ }
+ }
+ public static RleSegmentType fromInt(int value) {
+ return ints[value];
+ }
+ public byte getValue() {
+ return value;
+ }
+ }
+
+ public static int getSegmentDataSize(Type type, RleSegmentType segmentType, int rowCount) {
+ int valueSize = TYPE_SIZES[type.value()];
+ switch (segmentType) {
+ case REPEATING_NULL: return 0;
+ case REPEATING_VALUE: return valueSize;
+ case UNIQUE_NOT_NULL: return valueSize * rowCount;
+ case UNIQUE_NULL_BITMASK:return valueSize * rowCount + align8(bitMaskSize(rowCount));
+ default: throw new AssertionError("Unsupported segment type " + segmentType);
+ }
+ }
+
+ public static int bitMaskSize(int rowCount) {
+ return (rowCount >>> 3) + (((rowCount & 7) > 0) ? 1 : 0);
+ }
+
+ public static int align8(int number) {
+ int rem = number & 7;
+ return number - rem + (rem == 0 ? 0 : 8);
+ }
+
+ public static int align64(int number) {
+ int rem = number & 63;
+ return number - rem + (rem == 0 ? 0 : 64);
+ }
+
+ public static int getNonRepeatingValuesOffset(RleSegmentType type, int rowCount) {
+ if (type == RleSegmentType.UNIQUE_NULL_BITMASK) {
+ return 8 + align8(bitMaskSize(rowCount));
+ }
+ return 8;
+ }
+
+ public static int getFullBitmaskSize(int sizeOf) {
+ return BITMASK_SIZE_BYTES * (1 + (sizeOf << 3));
+ }
+}
Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkWriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkWriterImpl.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkWriterImpl.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkWriterImpl.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.chunk;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkUtils;
+import org.apache.hadoop.hive.llap.chunk.ChunkUtils.RleSegmentType;
+import org.apache.hadoop.hive.llap.loader.BufferInProgress;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+
+/**
+ * Chunk writer. Reusable, not thread safe. See ChunkReader for format details.
+ */
+public class ChunkWriterImpl implements ChunkWriter {
+ private BufferInProgress colBuffer;
+
+ public void prepare(BufferInProgress colBuffer) {
+ this.colBuffer = colBuffer;
+ Chunk chunk = this.colBuffer.ensureChunk();
+ if (chunk.length == 0) {
+ // This is a new chunk; reserve space for header.
+ colBuffer.offset += 8;
+ chunk.length = 8;
+ }
+ valuesOffset = colBuffer.offset;
+ }
+
+ /**
+ * Completes the chunk by writing the header with externally-tracked row count.
+ * Does not have to be prepare()-d - any chunk can be update via this method.
+ * @param chunk Chunk to update.
+ * @param rowCount Row count in the chunk.
+ */
+ public void finishChunk(Chunk chunk, int rowCount) {
+ // The space for chunk start is reserved; no need to update offset or length.
+ assert currentSegmentStart == -1;
+ ByteBuffer buf = chunk.buffer.getContents();
+ buf.put(chunk.offset, ChunkUtils.FORMAT_VERSION);
+ buf.putInt(chunk.offset + 1, rowCount);
+ }
+
+ // State of current segment.
+ private int currentSegmentStart = -1, currentSegmentValues = -1;
+ private boolean currentSegmentIsRepeating = false, currentSegmentHasNulls = false;
+ private long currentRepeatingLongValue = -1;
+ private int currentSizeOf = -1;
+
+ // State of the unique segment currently being written.
+ private int currentBitmaskOffset = -1, currentBitmaskLimit = -1,
+ valuesOffset = -1, valuesSinceBitmask = -1;
+
+ @Override
+ public int estimateValueCountThatFits(Type type, boolean hasNulls) {
+ // Assume we'd only need to write unique values without nulls, we can always do that.
+ // If we are in the middle of a bitmask segment, space for bitmask was already reserved
+ // so values will take just as much space as without bitmask.
+ // Caller is supposed to re-estimate after every write.
+ return (colBuffer.getSpaceLeft(valuesOffset) - 8) / ChunkUtils.TYPE_SIZES[type.value()];
+ }
+
+ @Override
+ public void writeLongs(long[] src, int srcOffset, int srcCount, NullsState nullsState) {
+ writeLongsInternal(src, srcOffset, srcCount, nullsState, true);
+ }
+
+ @Override
+ public void writeLongs(byte[] src, int srcOffset, int srcCount, NullsState nullsState) {
+ writeLongsInternal(src, srcOffset, srcCount, nullsState, false);
+ }
+
+ private void writeLongsInternal(
+ Object srcObj, int srcOffset, int srcCount, NullsState nullsState, boolean isLongSrc) {
+ long[] srcL = isLongSrc ? (long[])srcObj : null;
+ byte[] srcB = isLongSrc ? null : (byte[])srcObj;
+ ByteBuffer buffer = colBuffer.buffer.getContents();
+ currentSizeOf = ChunkUtils.TYPE_SIZES[Type.LONG.value()];
+ ensureUniqueValueSegment(buffer, srcCount, nullsState);
+ if (!currentSegmentHasNulls) {
+ valuesOffset = isLongSrc
+ ? writeLongs(buffer, valuesOffset, srcL, srcOffset, srcCount, currentSizeOf)
+ : writeLongs(buffer, valuesOffset, srcB, srcOffset, srcCount, currentSizeOf);
+ currentSegmentValues += srcCount;
+ } else {
+ if (valuesSinceBitmask == currentBitmaskLimit) {
+ startNextBitmask(buffer);
+ }
+ // Write bitmasks followed by values, until we write all the values.
+ while (srcCount > 0) {
+ int valuesToWrite = Math.min(currentBitmaskLimit - valuesSinceBitmask, srcCount);
+ assert valuesToWrite > 0 : valuesSinceBitmask + "/" + currentBitmaskLimit + " " + srcCount;
+ writeZeroesIntoBytes(buffer, currentBitmaskOffset, valuesSinceBitmask, valuesToWrite);
+ valuesOffset = isLongSrc
+ ? writeLongs(buffer, valuesOffset, srcL, srcOffset, valuesToWrite, currentSizeOf)
+ : writeLongs(buffer, valuesOffset, srcB, srcOffset, valuesToWrite, currentSizeOf);
+ valuesSinceBitmask += valuesToWrite;
+ currentSegmentValues += valuesToWrite;
+ srcOffset += valuesToWrite;
+ srcCount -= valuesToWrite;
+ if (srcCount > 0) {
+ assert valuesSinceBitmask == currentBitmaskLimit;
+ startNextBitmask(buffer);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeDoubles(double[] src, int srcOffset, int srcCount, NullsState nullsState) {
+ ByteBuffer buffer = colBuffer.buffer.getContents();
+ currentSizeOf = ChunkUtils.TYPE_SIZES[Type.DOUBLE.value()];
+ ensureUniqueValueSegment(buffer, srcCount, nullsState);
+ if (!currentSegmentHasNulls) {
+ valuesOffset = writeDoubles(buffer, valuesOffset, src, srcOffset, srcCount, currentSizeOf);
+ currentSegmentValues += srcCount;
+ } else {
+ if (valuesSinceBitmask == currentBitmaskLimit) {
+ startNextBitmask(buffer);
+ }
+ // Write bitmasks followed by values, until we write all the values.
+ while (srcCount > 0) {
+ int valuesToWrite = Math.min(currentBitmaskLimit - valuesSinceBitmask, srcCount);
+ assert valuesToWrite > 0;
+ writeZeroesIntoBytes(buffer, currentBitmaskOffset, valuesSinceBitmask, valuesToWrite);
+ valuesOffset = writeDoubles(
+ buffer, valuesOffset, src, srcOffset, valuesToWrite, currentSizeOf);
+ valuesSinceBitmask += valuesToWrite;
+ currentSegmentValues += valuesToWrite;
+ srcOffset += valuesToWrite;
+ srcCount -= valuesToWrite;
+ if (srcCount > 0) {
+ assert valuesSinceBitmask == currentBitmaskLimit;
+ startNextBitmask(buffer);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeNulls(int count, boolean followedByNonNull) {
+ if (!currentSegmentHasNulls || (!currentSegmentIsRepeating
+ && valuesSinceBitmask == 0 && count > END_UNIQUE_SEGMENT_RUN_LEN)) {
+ finishCurrentSegment();
+ }
+ if (currentSegmentStart != -1 && !currentSegmentIsRepeating) {
+ assert currentSegmentHasNulls && valuesSinceBitmask > 0;
+ assert currentSizeOf > 0;
+ ByteBuffer buffer = colBuffer.buffer.getContents();
+ // We are writing into an existing segment with bitmasks. Currently, we arbitrarily
+ // choose to finish writing bitmask in such cases. Given that we write values despite
+ // bitmask, this may be suboptimal, but otherwise we might write million tiny segments.
+ int valuesToWrite = Math.min(currentBitmaskLimit - valuesSinceBitmask, count);
+ if (valuesToWrite > 0) {
+ writeOnesIntoBytes(buffer, currentBitmaskOffset, valuesSinceBitmask, valuesToWrite);
+ valuesOffset += (currentSizeOf * valuesToWrite);
+ valuesSinceBitmask += valuesToWrite;
+ currentSegmentValues += valuesToWrite;
+ count -= valuesToWrite;
+ }
+ if (count == 0) return;
+ // Might not make sense if count remaining is low and is followed by non-nulls.
+ finishCurrentSegment();
+ }
+ if (currentSegmentStart == -1) {
+ // We have no segment. For small count, starting a bitmask might make sense, but for now
+ // we always start repeated nulls segment, even if it doesn't make any sense.
+ currentSizeOf = -1;
+ startRepeatingSegment();
+ currentSegmentHasNulls = true;
+ }
+ assert currentSegmentIsRepeating && currentSegmentHasNulls;
+ currentSegmentValues += count;
+ }
+
+ /** Arbitrary; the tradeoff is between wasting space writing repeated values,
+ * and having many tiny segments that are more expensive to read. */
+ private static final int END_UNIQUE_SEGMENT_RUN_LEN = 10;
+ @Override
+ public void writeRepeatedLongs(long value, int count, NullsState nullsState) {
+ boolean isIncompatibleRepeating = currentSegmentIsRepeating
+ && (currentSegmentHasNulls || currentRepeatingLongValue != value);
+ boolean isAtEndOfBitmask = !currentSegmentIsRepeating
+ && currentSegmentHasNulls && valuesSinceBitmask == 0;
+ if (isIncompatibleRepeating || (isAtEndOfBitmask && count >= END_UNIQUE_SEGMENT_RUN_LEN)) {
+ finishCurrentSegment();
+ }
+ if (currentSegmentStart != -1 && !currentSegmentIsRepeating) {
+ assert currentSizeOf > 0;
+ ByteBuffer buffer = colBuffer.buffer.getContents();
+ if (currentSegmentHasNulls) {
+ assert valuesSinceBitmask > 0;
+ // See writeNulls - similar logic.
+ int valuesToWrite = Math.min(currentBitmaskLimit - valuesSinceBitmask, count);
+ if (valuesToWrite > 0) {
+ writeZeroesIntoBytes(buffer, currentBitmaskOffset, valuesSinceBitmask, valuesToWrite);
+ valuesOffset = writeLongs(buffer, valuesOffset, value, valuesToWrite, currentSizeOf);
+ valuesSinceBitmask += valuesToWrite;
+ currentSegmentValues += valuesToWrite;
+ count -= valuesToWrite;
+ }
+ if (count > 0) {
+ finishCurrentSegment();
+ }
+ } else if (count < END_UNIQUE_SEGMENT_RUN_LEN) {
+ valuesOffset = writeLongs(buffer, valuesOffset, value, count, currentSizeOf);
+ valuesSinceBitmask += count;
+ currentSegmentValues += count;
+ count = 0;
+ } else {
+ finishCurrentSegment();
+ }
+ }
+ if (count == 0) return;
+ if (currentSegmentStart == -1) {
+ // We have no segment. For small count, starting a bitmask might make sense, but for now
+ // we always start repeated segment, even if it doesn't make any sense.
+ currentSizeOf = ChunkUtils.TYPE_SIZES[Type.LONG.value()];
+ startRepeatingSegment();
+ currentSegmentHasNulls = false;
+ currentRepeatingLongValue = value;
+ colBuffer.buffer.getContents().putLong(valuesOffset, value);
+ valuesOffset += currentSizeOf;
+ }
+ assert currentSegmentIsRepeating && (currentRepeatingLongValue == value);
+ currentSegmentValues += count;
+ }
+
+ @Override
+ public void finishCurrentSegment() {
+ if (currentSegmentStart == -1) return;
+ ByteBuffer buffer = colBuffer.buffer.getContents();
+ RleSegmentType segmentType = RleSegmentType.INVALID;
+ if (currentSegmentIsRepeating || !currentSegmentHasNulls || valuesSinceBitmask == 0
+ || valuesSinceBitmask == currentBitmaskLimit) {
+ // Simple case - just write the type and count into current segment header.
+ segmentType = currentSegmentIsRepeating ? (currentSegmentHasNulls
+ ? RleSegmentType.REPEATING_NULL : RleSegmentType.REPEATING_VALUE)
+ : (currentSegmentHasNulls
+ ? RleSegmentType.UNIQUE_NULL_BITMASK : RleSegmentType.UNIQUE_NOT_NULL);
+ } else {
+ // Complicated case - bitmask is not finished, we may need to move values.
+ segmentType = RleSegmentType.UNIQUE_NULL_BITMASK;
+ int adjustedValues = ChunkUtils.align64(valuesSinceBitmask); // Rounded to 8 bytes.
+ int bytesShift = (ChunkUtils.align64(currentBitmaskLimit) - adjustedValues) >>> 3;
+ // Will never happen when bitmask is 8 bytes - minimum and maximum sizes are the same.
+ if (bytesShift > 0) {
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("Adjusting last bitmask by " + bytesShift + " bytes");
+ }
+ assert currentSizeOf > 0;
+ int valuesSize = valuesSinceBitmask * currentSizeOf;
+ int valuesStart = valuesOffset - valuesSize;
+ assert buffer.hasArray();
+ byte[] arr = buffer.array();
+ System.arraycopy(arr, valuesStart, arr, valuesStart - bytesShift, valuesSize);
+ valuesOffset -= bytesShift;
+ }
+ }
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("Writing " + segmentType + " header w/ " + currentSegmentValues
+ + " values at " + currentSegmentStart + " till " + valuesOffset);
+ }
+ writeSegmentHeader(buffer, currentSegmentStart, segmentType, currentSegmentValues);
+ colBuffer.update(valuesOffset, currentSegmentValues);
+ currentSegmentStart = -1;
+ }
+
+ private void startRepeatingSegment() {
+ currentSegmentIsRepeating = true;
+ currentSegmentStart = valuesOffset;
+ currentSegmentValues = 0;
+ currentBitmaskOffset = -1;
+ valuesOffset += 8;
+ }
+
+ private void ensureUniqueValueSegment(ByteBuffer buffer, int valueCount, NullsState nullsState) {
+ boolean forceNoNulls = false;
+ if (currentSegmentStart != -1) {
+ if (!currentSegmentIsRepeating) {
+ if (!currentSegmentHasNulls
+ || canValuesFitIntoCurrentSegment(buffer, valueCount, currentSizeOf)) {
+ return; // We have an unique-value segment w/o bitmasks, or values fit w/bitmasks.
+ }
+ forceNoNulls = true;
+ }
+ finishCurrentSegment();
+ }
+ // There no unique-value segment (or we just finished one), start one.
+ currentSegmentStart = valuesOffset;
+ valuesOffset += 8;
+ currentSegmentIsRepeating = false;
+ valuesSinceBitmask = currentSegmentValues = 0;
+ currentSegmentHasNulls = !forceNoNulls && shouldNewSegmentHaveBitmasks(
+ valueCount, nullsState, buffer, valuesOffset, currentSizeOf);
+ if (!currentSegmentHasNulls) {
+ currentBitmaskOffset = -1;
+ } else {
+ startNextBitmask(buffer);
+ }
+ }
+
+ private void startNextBitmask(ByteBuffer buffer) {
+ currentBitmaskOffset = valuesOffset;
+ int spaceLeft = buffer.limit() - currentBitmaskOffset;
+ valuesSinceBitmask = 0;
+ valuesOffset = currentBitmaskOffset;
+ if (spaceLeft >= ChunkUtils.getFullBitmaskSize(currentSizeOf)) {
+ // Most of the time, standard-sized bitmask will fit (we are not at the end of the buffer).
+ currentBitmaskLimit = ChunkUtils.BITMASK_SIZE_BITS;
+ valuesOffset += ChunkUtils.BITMASK_SIZE_BYTES;
+ return;
+ }
+ // Only part of the bitmask will fit, so we will have a smaller one.
+ int incrementSize = 8 + (currentSizeOf << 6); // 8 bytes, 64 values (minimum bitmask alignment)
+ int incrementsThatFit = (spaceLeft / incrementSize);
+ // Per each part, we will add 8 bytes to have space for bitmask, and space for 64 values.
+ valuesOffset += (incrementsThatFit << 3);
+ currentBitmaskLimit = incrementsThatFit >>> 6;
+ // If there's more space, try to fit 8 more bytes of bitmask with less than 64 values.
+ spaceLeft = (spaceLeft % incrementSize) - 8; // 8 bytes for that last bitmask
+ if (spaceLeft >= currentSizeOf) {
+ valuesOffset += 8;
+ currentBitmaskLimit += (spaceLeft / currentSizeOf);
+ }
+ if (currentBitmaskLimit == 0) {
+ throw new AssertionError("Bitmask won't fit; caller should have checked that");
+ }
+ }
+
+ private static void writeZeroesIntoBytes(
+ ByteBuffer buffer, int bitmaskOffset, int valuesSinceBitmask, int valuesToWrite) {
+ // No need to write 0s into the tail of a partial byte - already set to 0s by
+ // the previous call to writeZeroesIntoBytes or writeOnesIntoBytes.
+ int bitsToSkip = 8 - (valuesSinceBitmask & 7);
+ if (bitsToSkip < 8) {
+ valuesToWrite -= bitsToSkip;
+ valuesSinceBitmask += bitsToSkip;
+ if (valuesToWrite <= 0) return;
+ }
+ int nextByteToModify = bitmaskOffset + (valuesSinceBitmask >>> 3);
+ while (valuesToWrite > 0) {
+ buffer.put(nextByteToModify, (byte)0);
+ valuesToWrite -= 8;
+ ++nextByteToModify;
+ }
+ }
+
+ private static void writeOnesIntoBytes(
+ ByteBuffer buffer, int bitmaskOffset, int valuesSinceBitmask, int valuesToWrite) {
+ // We need to set the bits in the bitmask to one. We may have to do partial bits,
+ // then whole bytes, then partial bits again for the last rows. I hate bits.
+ int nextByteToModify = bitmaskOffset + (valuesSinceBitmask >>> 3);
+ int bitsWritten = writeOneBitsFromPartialByte(
+ buffer, nextByteToModify, valuesSinceBitmask, valuesToWrite);
+ if (bitsWritten > 0) {
+ valuesToWrite -= bitsWritten;
+ ++nextByteToModify;
+ }
+ while (valuesToWrite > 8) {
+ buffer.put(nextByteToModify, (byte)0xff);
+ valuesToWrite -= 8;
+ ++nextByteToModify;
+ }
+ if (valuesToWrite > 0) {
+ int newBitsMask = ((1 << valuesToWrite) - 1) << (8 - valuesToWrite);
+ buffer.put(nextByteToModify, (byte)newBitsMask);
+ }
+ }
+
+ private static int writeOneBitsFromPartialByte(
+ ByteBuffer buffer, int bufferOffset, int valuesInBitmask, int valuesToWrite) {
+ int bitOffset = valuesInBitmask & 7;
+ if (bitOffset == 0) return 0;
+ assert bitOffset < 8;
+ byte partialByte = buffer.get(bufferOffset);
+ int unusedBits = 8 - bitOffset;
+ int result = Math.min(unusedBits, valuesToWrite);
+ // Make newBitCount ones, then shift them to create 0s on the right.
+ int newBitsMask = ((1 << result) - 1) << (unusedBits - result);
+ byte newByte = (byte)(partialByte | newBitsMask);
+ buffer.put(bufferOffset, newByte);
+ return result;
+ }
+
+ private static boolean shouldNewSegmentHaveBitmasks(
+ int valueCount, NullsState nullsState, ByteBuffer buffer, int offset, int sizeOf) {
+ // This is rather arbitrary. We'll write some values w/o bitmask if there are enough.
+ // What is enough is an interesting question. We pay 8 bytes extra for segment header
+ // if this is immediately followed by some nulls; so use this as a guideline. If we
+ // don't know if this is followed by null or more values, use half?
+ if (nullsState == NullsState.NO_NULLS || valueCount >= ChunkUtils.BITMASK_SIZE_BITS) {
+ return false;
+ }
+ return canValuesFitWithBitmasks(buffer, offset, valueCount, sizeOf);
+ }
+
+ private boolean canValuesFitIntoCurrentSegment(ByteBuffer buffer, int valueCount, int sizeOf) {
+ int valuesIntoCurrentBitmask = (currentBitmaskLimit - valuesSinceBitmask);
+ valueCount -= valuesIntoCurrentBitmask;
+ if (valueCount <= 0) return true;
+ int nextBitmaskOffset = valuesOffset + valuesIntoCurrentBitmask * sizeOf;
+ return canValuesFitWithBitmasks(buffer, nextBitmaskOffset, valueCount, sizeOf);
+ }
+
+ private static boolean canValuesFitWithBitmasks(
+ ByteBuffer buffer, int offset, int valueCount, int elementSize) {
+ return (determineSizeWithBitMask(valueCount, elementSize) < (buffer.limit() - offset));
+ }
+
+ private static int determineSizeWithBitMask(int count, int elementSize) {
+ return count * elementSize + (ChunkUtils.align64(count) >>> 3);
+ }
+
+ private static void writeSegmentHeader(
+ ByteBuffer buffer, int offset, RleSegmentType type, int rowCount) {
+ buffer.put(offset++, type.getValue());
+ buffer.putInt(offset, rowCount);
+ }
+
+ private static int writeLongs(
+ ByteBuffer buffer, int offset, long[] cv, int cvOffset, int rowsToWrite, int sizeOf) {
+ assert sizeOf > 0;
+ for (int i = 0; i < rowsToWrite; ++i) {
+ buffer.putLong(offset, cv[cvOffset + i]);
+ offset += sizeOf;
+ }
+ return offset;
+ }
+
+ private static int writeDoubles(
+ ByteBuffer buffer, int offset, double[] cv, int cvOffset, int rowsToWrite, int sizeOf) {
+ assert sizeOf > 0;
+ for (int i = 0; i < rowsToWrite; ++i) {
+ buffer.putDouble(offset, cv[cvOffset + i]);
+ offset += sizeOf;
+ }
+ return offset;
+ }
+
+ private static int writeLongs(
+ ByteBuffer buffer, int offset, byte[] cv, int cvOffset, int rowsToWrite, int sizeOf) {
+ assert sizeOf > 0;
+ for (int i = 0; i < rowsToWrite; ++i) {
+ buffer.putLong(offset, cv[cvOffset + i]);
+ offset += sizeOf;
+ }
+ return offset;
+ }
+
+ private static int writeLongs(
+ ByteBuffer buffer, int offset, long value, int rowsToWrite, int sizeOf) {
+ for (int i = 0; i < rowsToWrite; ++i) {
+ buffer.putLong(offset, value);
+ offset += sizeOf;
+ }
+ return offset;
+ }
+}
Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/BufferInProgress.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/BufferInProgress.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/BufferInProgress.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/BufferInProgress.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.loader;
+
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+
+/**
+ * Helper struct that is used by loaders (e.g. OrcLoader) and chunk writer to write chunks.
+ */
+public class BufferInProgress {
+ /** Buffer that is being written to. */
+ public final WeakBuffer buffer;
+ /** Offset in buffer where writing can proceed */
+ public int offset; // TODO: use WB's position; these have separate lifecycle now, needed?
+ private final int bufferLimit;
+
+ /** The chunk that is currently being written. */
+ private Chunk chunkInProgress = null;
+ /** The row count of the chunk currently being written. */
+ private int chunkInProgressRows = 0;
+
+ public BufferInProgress(WeakBuffer buffer) {
+ this.buffer = buffer;
+ this.bufferLimit = buffer.getContents().limit();
+ this.offset = 0;
+ }
+
+ public Chunk ensureChunk() {
+ if (chunkInProgress == null) {
+ chunkInProgress = new Chunk(buffer, offset, 0);
+ chunkInProgressRows = 0;
+ }
+ return chunkInProgress;
+ }
+
+ public Chunk extractChunk() {
+ Chunk result = chunkInProgress;
+ chunkInProgress = null;
+ chunkInProgressRows = 0;
+ return result;
+ }
+
+ public void update(int newOffset, int rowsWritten) {
+ if (newOffset > bufferLimit) {
+ throw new AssertionError("Offset is beyond buffer limit: " + newOffset + "/" + bufferLimit
+ + "; previous offset " + offset + ", chunk " + chunkInProgress);
+ }
+ chunkInProgress.length += (newOffset - offset);
+ this.offset = newOffset;
+ this.chunkInProgressRows += rowsWritten;
+ }
+
+ public int getChunkInProgressRows() {
+ return chunkInProgressRows;
+ }
+
+ public int getSpaceLeft() {
+ return getSpaceLeft(-1);
+ }
+
+ public int getSpaceLeft(int offset) {
+ offset = (offset >= 0) ? offset : this.offset;
+ return buffer.getContents().limit() - offset;
+ }
+}
Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/ChunkPool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/ChunkPool.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/ChunkPool.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/ChunkPool.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.loader;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.cache.EvictionListener;
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+
+/**
+ * This class contains the mapping of file chunks to buffers inside BufferPool.
+ */
+public class ChunkPool<K> implements EvictionListener {
+ private final ConcurrentHashMap<K, Chunk> chunkCache = new ConcurrentHashMap<K, Chunk>();
+
+ /** Number of unprocessed evictions, for the background thread. */
+ private final AtomicInteger newEvictions = new AtomicInteger(0);
+ private final Thread cleanupThread;
+
+ public ChunkPool() {
+ cleanupThread = new CleanupThread();
+ cleanupThread.start();
+ }
+
+ /**
+ * Gets a chunk from cache
+ * TODO: We expect that in most cases, some related chunks (e.g. columns for a stripe)
+ * will be stored in the same buffer. We could use this to get keys more efficiently.
+ * On the other hand, real stripes are pretty big.
+ * @param key key to search for.
+ * @return Chunk corresponding to k.
+ */
+ public Chunk getChunk(K key, HashSet<WeakBuffer> lockedBuffers) {
+ Chunk result = chunkCache.get(key);
+ if (result == null) {
+ return null;
+ }
+ while (true) {
+ if (lockChunk(result, lockedBuffers)) return result;
+ if (chunkCache.remove(key, result)) return null;
+ }
+ }
+
+ private boolean lockChunk(Chunk result, HashSet<WeakBuffer> lockedBuffers) {
+ // We expect the chain to have 1 or 2 buffers (2 if we are on buffer boundary). Keep track of
+ // what we lock in the bitmask; may need fixing (extremely unlikely - 64+ buffer, giant chunks)
+ boolean failedToLock = false;
+ long blocksToUnlock = 0;
+ long bit = 1 << 63; // The bit indicating that current chunk was locked.
+
+ Chunk chunk = result;
+ while (chunk != null) {
+ if (lockedBuffers.contains(chunk.buffer)) {
+ assert chunk.buffer.isLocked() : chunk.buffer + " is in lockedBuffers but is not locked";
+ } else if (chunk.buffer.lock(true)) {
+ if (DebugUtils.isTraceLockingEnabled()) {
+ Llap.LOG.info("Locked " + chunk.buffer + " for " + result);
+ }
+ lockedBuffers.add(chunk.buffer);
+ blocksToUnlock += bit;
+ } else {
+ failedToLock = true;
+ break;
+ }
+ bit >>>= 1;
+ chunk = chunk.nextChunk;
+ if (bit == 1 && chunk != null) {
+ throw new AssertionError("Chunk chain was too long");
+ }
+ }
+ if (!failedToLock) return true;
+
+ bit = 1 << 63;
+ Chunk chunk2 = result;
+ while (chunk2 != chunk) {
+ if ((blocksToUnlock & bit) == bit) {
+ if (DebugUtils.isTraceLockingEnabled()) {
+ Llap.LOG.info("Unlocking " + chunk2.buffer + " due to failed chunk lock");
+ }
+ lockedBuffers.remove(chunk2.buffer);
+ chunk2.buffer.unlock();
+ }
+ bit >>>= 1;
+ chunk2 = chunk2.nextChunk;
+ }
+ return false;
+ }
+
+ private boolean verifyChunk(Chunk entry) {
+ Chunk chunk = entry;
+ while (chunk != null) {
+ if (!chunk.buffer.lock(false)) break;
+ chunk = chunk.nextChunk;
+ }
+ Chunk chunk2 = entry;
+ while (chunk2 != chunk) {
+ chunk2.buffer.unlock();
+ chunk2 = chunk2.nextChunk;
+ }
+ return chunk == null;
+ }
+
+ public Chunk addOrGetChunk(K key, Chunk val, HashSet<WeakBuffer> lockedBuffers) {
+ assert val.buffer.isLocked();
+ while (true) {
+ Chunk oldVal = chunkCache.putIfAbsent(key, val);
+ if (oldVal == null) return val;
+ if (DebugUtils.isTraceCachingEnabled()) {
+ Llap.LOG.info("Trying to cache when the chunk is already cached for "
+ + key + "; old " + oldVal + ", new " + val);
+ }
+ if (lockChunk(oldVal, lockedBuffers)) return oldVal;
+ // We found some old value but couldn't lock it; remove it.
+ chunkCache.remove(key, oldVal);
+ }
+ }
+
+ @Override
+ public void evictionNotice(WeakBuffer evicted) {
+ int oldValue = newEvictions.getAndIncrement();
+ if (oldValue == 0) {
+ synchronized (newEvictions) {
+ newEvictions.notifyAll();
+ }
+ }
+ }
+
+ public static class Chunk {
+ public WeakBuffer buffer;
+ public int offset, length;
+ public Chunk nextChunk;
+
+ public Chunk(WeakBuffer buffer, int offset, int length) {
+ this.buffer = buffer;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public Chunk addChunk(Chunk another) {
+ // Traversing list is bad; however, we expect that this will very rarely happen; and in
+ // nearly all the cases when it does (buffer boundary) the list will have 1 element.
+ Chunk chunk = this;
+ while (chunk.nextChunk != null) {
+ chunk = chunk.nextChunk;
+ }
+ chunk.nextChunk = another;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "{" + buffer + ", " + offset + ", " + length + "}";
+ }
+
+ public String toFullString() {
+ String result = "";
+ Chunk chunk = this;
+ while (chunk != null) {
+ result += chunk.toString() + ", ";
+ chunk = chunk.nextChunk;
+ }
+ return result;
+ }
+ }
+
+ private final class CleanupThread extends Thread {
+ private int APPROX_CLEANUP_INTERVAL_SEC = 600;
+
+ public CleanupThread() {
+ super("Llap ChunkPool cleanup thread");
+ setDaemon(true);
+ setPriority(1);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ doOneCleanupRound();
+ } catch (InterruptedException ex) {
+ Llap.LOG.warn("Cleanup thread has been interrupted");
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Throwable t) {
+ Llap.LOG.error("Cleanup has failed; the thread will now exit", t);
+ break;
+ }
+ }
+ }
+
+ private void doOneCleanupRound() throws InterruptedException {
+ while (true) {
+ int evictionsSinceLast = newEvictions.getAndSet(0);
+ if (evictionsSinceLast > 0) break;
+ synchronized (newEvictions) {
+ newEvictions.wait(10000);
+ }
+ }
+ // Duration is an estimate; if the size of the map changes rapidly, it can be very different.
+ long endTime = System.nanoTime() + APPROX_CLEANUP_INTERVAL_SEC * 1000000000L;
+ int processed = 0;
+ // TODO: if this iterator affects the map in some bad way,
+ // we'd need to sleep once per round instead.
+ Iterator<Map.Entry<K, Chunk>> iter = chunkCache.entrySet().iterator();
+ while (iter.hasNext()) {
+ if (!verifyChunk(iter.next().getValue())) {
+ iter.remove();
+ }
+ ++processed;
+ int approxElementsLeft = chunkCache.size() - processed;
+ Thread.sleep((approxElementsLeft <= 0)
+ ? 1 : (endTime - System.nanoTime()) / (1000000L * approxElementsLeft));
+ }
+ }
+ }
+}
Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/Loader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/Loader.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/Loader.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/Loader.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.llap.loader;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.api.Vector;
+import org.apache.hadoop.hive.llap.api.impl.RequestImpl;
+import org.apache.hadoop.hive.llap.api.impl.VectorImpl;
+import org.apache.hadoop.hive.llap.cache.BufferPool;
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriterImpl;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+import org.apache.hadoop.hive.llap.processor.ChunkConsumer;
+import org.apache.hadoop.hive.llap.processor.ChunkProducerFeedback;
+
+// TODO: write unit tests if this class becomes less primitive.
+public abstract class Loader {
+ // For now, we have one buffer pool. Add bufferpool per load when needed.
+ private final BufferPool bufferPool;
+ private final ConcurrentLinkedQueue<BufferInProgress> reusableBuffers =
+ new ConcurrentLinkedQueue<BufferInProgress>();
+ protected final ChunkWriterImpl writer;
+
+
+ public Loader(BufferPool bufferPool) {
+ this.bufferPool = bufferPool;
+ this.writer = new ChunkWriterImpl();
+ }
+
+ protected class LoadContext implements ChunkProducerFeedback {
+ public volatile boolean isStopped = false;
+
+ @Override
+ public void returnCompleteVector(Vector vector) {
+ Loader.this.returnCompleteVector(vector);
+ }
+
+ @Override
+ public void stop() {
+ isStopped = true;
+ }
+ }
+
+ public final void load(RequestImpl request, ChunkConsumer consumer)
+ throws IOException, InterruptedException {
+ // TODO: this API is subject to change, just a placeholder. Ideally we want to refactor
+ // so that working with cache and buffer allocation/locking would be here, but right
+ // now it depends on OrcLoader (esp. locking is hard to pull out).
+ LoadContext context = new LoadContext();
+ consumer.init(context); // passed as ChunkProducerFeedback
+ loadInternal(request, consumer, context);
+ }
+
+ private void returnCompleteVector(Vector vector) {
+ VectorImpl vectorImpl = (VectorImpl)vector;
+ for (BufferPool.WeakBuffer buffer : vectorImpl.getCacheBuffers()) {
+ if (DebugUtils.isTraceLockingEnabled()) {
+ Llap.LOG.info("Unlocking " + buffer + " because reader is done");
+ }
+ buffer.unlock();
+ }
+ }
+
+ // TODO: this API is subject to change, just a placeholder.
+ protected abstract void loadInternal(RequestImpl request, ChunkConsumer consumer,
+ LoadContext context) throws IOException, InterruptedException;
+
+ protected final BufferInProgress prepareReusableBuffer(
+ HashSet<WeakBuffer> resultBuffers) throws InterruptedException {
+ while (true) {
+ BufferInProgress buf = reusableBuffers.poll();
+ if (buf == null) {
+ WeakBuffer newWb = bufferPool.allocateBuffer();
+ if (!resultBuffers.add(newWb)) {
+ throw new AssertionError("Cannot add new buffer to resultBuffers");
+ }
+ return new BufferInProgress(newWb);
+ }
+ if (resultBuffers.add(buf.buffer)) {
+ if (!buf.buffer.lock(true)) {
+ resultBuffers.remove(buf.buffer);
+ continue; // Buffer was evicted.
+ }
+ if (DebugUtils.isTraceLockingEnabled()) {
+ Llap.LOG.info("Locked " + buf.buffer + " due to reuse");
+ }
+ } else if (!buf.buffer.isLocked()) {
+ throw new AssertionError(buf.buffer + " is in resultBuffers, but is not locked");
+ }
+ }
+ }
+
+ protected final void returnReusableBuffer(BufferInProgress colBuffer) {
+ // Check space - 16 is chunk header plus one segment header, minimum required space.
+ // This should be extremely rare.
+ // TODO: use different value that makes some sense
+ // TODO: with large enough stripes it might be better not to split every stripe into two
+ // buffers but instead not reuse the buffer if e.g. 1Mb/15Mb is left.
+ if (colBuffer.getSpaceLeft() < 16) return;
+ reusableBuffers.add(colBuffer);
+ }
+
+ protected Chunk mergeResultChunks(BufferInProgress colBuffer,
+ Chunk existingResult, boolean finalCheck) throws IOException {
+ // Both should be extracted in one method, but it's too painful to do in Java.
+ int rowCount = colBuffer.getChunkInProgressRows();
+ Chunk chunk = colBuffer.extractChunk();
+ if (rowCount <= 0) {
+ if (finalCheck && existingResult == null) {
+ throw new IOException("No rows written for column");
+ }
+ return existingResult;
+ }
+ writer.finishChunk(chunk, rowCount);
+ return (existingResult == null) ? chunk : existingResult.addChunk(chunk);
+ }
+}
Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/OrcLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/OrcLoader.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/OrcLoader.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/OrcLoader.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.loader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.api.Vector;
+import org.apache.hadoop.hive.llap.api.impl.RequestImpl;
+import org.apache.hadoop.hive.llap.api.impl.VectorImpl;
+import org.apache.hadoop.hive.llap.cache.BufferPool;
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.apache.hadoop.hive.llap.cache.MetadataCache;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriterImpl;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+import org.apache.hadoop.hive.llap.processor.ChunkConsumer;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type.Kind;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+
+public class OrcLoader extends Loader {
+ private final ChunkPool<ChunkKey> chunkPool;
+ private FileSystem cachedFs = null;
+ private final MetadataCache metadataCache = new MetadataCache();
+
+ public OrcLoader(BufferPool bufferPool, ChunkPool<ChunkKey> chunkPool,
+ Configuration conf) throws IOException {
+ super(bufferPool);
+ this.chunkPool = chunkPool;
+ // We assume all splits will come from the same FS.
+ this.cachedFs = FileSystem.get(conf);
+ }
+
+ @Override
+ protected void loadInternal(RequestImpl request, ChunkConsumer consumer, LoadContext context)
+ throws IOException, InterruptedException {
+ // TODO: decide on - LocalActorSystem.INSTANCE.enqueue(request, bufferPool, consumer);
+ if (DebugUtils.isTraceMttEnabled()) {
+ Llap.LOG.info("loadInternal called");
+ }
+ List<Integer> includedCols = request.getColumns();
+ if (includedCols != null) {
+ Collections.sort(includedCols);
+ }
+ FileSplit fileSplit = (FileSplit)request.getSplit();
+ String internedFilePath = fileSplit.getPath().toString().intern();
+ Llap.LOG.info("Processing split for " + internedFilePath);
+ if (context.isStopped) return;
+ List<StripeInformation> stripes = metadataCache.getStripes(internedFilePath);
+ List<Type> types = metadataCache.getTypes(internedFilePath);
+ Reader reader = null;
+ if (stripes == null || types == null) {
+ reader = createReader(fileSplit);
+ if (stripes == null) {
+ stripes = reader.getStripes();
+ metadataCache.cacheStripes(internedFilePath, stripes);
+ }
+ if (types == null) {
+ types = reader.getTypes();
+ metadataCache.cacheTypes(internedFilePath, types);
+ }
+ }
+
+ // Determine which stripes belong to this split and make keys to get chunks from cache.
+ // This assumes all splits will have the same columns.
+ if (includedCols == null) {
+ includedCols = new ArrayList<Integer>(types.size());
+ for (int i = 1; i < types.size(); ++i) {
+ includedCols.add(i);
+ }
+ }
+ List<List<ChunkKey>> keys = new ArrayList<List<ChunkKey>>();
+ long stripeIxFromAndTo = determineStripesAndCacheKeys(
+ fileSplit, includedCols, internedFilePath, stripes, keys);
+ int stripeIxFrom = (int)(stripeIxFromAndTo >>> 32),
+ stripeIxTo = (int)(stripeIxFromAndTo & (long)Integer.MAX_VALUE);
+
+ // Prepare structures for tracking the results.
+ int resultVectorCount = stripeIxTo - stripeIxFrom;
+ Chunk[][] resultMatrix = new Chunk[resultVectorCount][];
+ @SuppressWarnings("unchecked")
+ // TODO: we store result buffers uniquely in a set, so we could lock/unlock them once. This may
+ // be more expensive than just making locking faster, and locking-unlocking as needed.
+ HashSet<WeakBuffer>[] resultBuffers = new HashSet[resultVectorCount];
+ for (int i = 0; i < resultVectorCount; ++i) {
+ resultMatrix[i] = new Chunk[types.size()];
+ resultBuffers[i] = new HashSet<WeakBuffer>();
+ }
+ if (context.isStopped) return;
+ // TODO: after this moment, we must be careful when checking isStopped to avoid
+ // leaving some chunks locked and un-consumed. For now we just never check.
+
+ // For now we will fetch missing results by stripe - this is how reader needs them.
+ List<Integer> readyStripes = getChunksFromCache(
+ keys, types.size(), stripeIxFrom, resultBuffers, resultMatrix);
+ if (readyStripes != null) {
+ Llap.LOG.info("Got " + readyStripes.size() + " full stripes from cache");
+ for (Integer stripeIx : readyStripes) {
+ int stripeIxMod = stripeIx - stripeIxFrom;
+ VectorImpl vector = createVectorForStripe(
+ resultMatrix[stripeIxMod], resultBuffers[stripeIxMod], types, includedCols);
+ if (DebugUtils.isTraceMttEnabled()) {
+ Llap.LOG.info("Returning stripe " + stripeIx + " from cache");
+ }
+ consumer.consumeVector(vector);
+ resultMatrix[stripeIxMod] = null;
+ resultBuffers[stripeIxMod] = null;
+ }
+ }
+
+ // Now we have a set of keys for all the things that are missing. Fetch them...
+ // TODO: this should happen on some sort of IO thread pool.
+ for (List<ChunkKey> stripeKeys : keys) {
+ if (stripeKeys.isEmpty()) continue;
+ int stripeIx = stripeKeys.get(0).stripeIx;
+ StripeInformation si = stripes.get(stripeIx);
+ List<Integer> includeList = null;
+ if (includedCols.size() == stripeKeys.size()) {
+ includeList = includedCols;
+ } else {
+ includeList = new ArrayList<Integer>(stripeKeys.size());
+ for (ChunkKey key : stripeKeys) {
+ includeList.add(key.colIx);
+ }
+ }
+ boolean[] includes = OrcInputFormat.genIncludedColumns(types, includeList, true);
+ if (Llap.LOG.isDebugEnabled()) {
+ Llap.LOG.debug("Reading stripe " + stripeIx + " {"
+ + si.getOffset() + ", " + si.getLength() + "}, cols " + Arrays.toString(includes));
+ }
+
+ if (reader == null) {
+ reader = createReader(fileSplit);
+ }
+
+ RecordReader stripeReader = reader.rows(si.getOffset(), si.getLength(), includes);
+ int stripeIxMod = stripeIx - stripeIxFrom;
+ Chunk[] result = resultMatrix[stripeIxMod];
+ HashSet<WeakBuffer> buffers = resultBuffers[stripeIxMod];
+
+ loadStripe(stripeReader, stripeKeys, result, buffers);
+ stripeReader.close();
+ VectorImpl vector = createVectorForStripe(result, buffers, types, includedCols);
+ if (DebugUtils.isTraceMttEnabled()) {
+ Llap.LOG.info("Returning stripe " + stripeIx + " from FS");
+ }
+ consumer.consumeVector(vector);
+ }
+ consumer.setDone();
+ if (DebugUtils.isTraceMttEnabled()) {
+ Llap.LOG.info("loadInternal is done");
+ }
+ }
+
+ /**
+ * Determines which stripe range belongs to a split, and generates cache keys
+ * for all these stripes and all the included columns.
+ * @param fileSplit The split.
+ * @param includedCols Included columns.
+ * @param internedFilePath Interned file path from the split, for cache keys.
+ * @param stripes Stripe information from the reader.
+ * @param keys The keys for cache lookups are inserted here.
+ * @return Combined int-s for stripe from (inc.) and to (exc.) indexes, because Java is a joke
+ */
+ private long determineStripesAndCacheKeys(FileSplit fileSplit, List<Integer> includedCols,
+ String internedFilePath, List<StripeInformation> stripes, List<List<ChunkKey>> keys) {
+ // The unit of caching for ORC is (stripe x column) (see ChunkKey). Note that we do not use
+ // SARG anywhere, because file-level filtering on sarg is already performed during split
+ // generation, and stripe-level filtering to get row groups is not very helpful right now.
+ long offset = fileSplit.getStart(), maxOffset = offset + fileSplit.getLength();
+ int stripeIxFrom = -1, stripeIxTo = -1, stripeIx = 0;
+ if (Llap.LOG.isDebugEnabled()) {
+ String tmp = "FileSplit {" + fileSplit.getStart()
+ + ", " + fileSplit.getLength() + "}; stripes ";
+ for (StripeInformation stripe : stripes) {
+ tmp += "{" + stripe.getOffset() + ", " + stripe.getLength() + "}, ";
+ }
+ Llap.LOG.debug(tmp);
+ }
+
+ for (StripeInformation stripe : stripes) {
+ long stripeStart = stripe.getOffset();
+ if (offset > stripeStart) continue;
+ if (stripeIxFrom == -1) {
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("Including from " + stripeIx
+ + " (" + stripeStart + " >= " + offset + ")");
+ }
+ stripeIxFrom = stripeIx;
+ }
+ if (stripeStart >= maxOffset) {
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("Including until " + stripeIxTo
+ + " (" + stripeStart + " >= " + maxOffset + ")");
+ }
+ stripeIxTo = stripeIx;
+ break;
+ }
+
+ ArrayList<ChunkKey> stripeKeys = new ArrayList<ChunkKey>(includedCols.size());
+ keys.add(stripeKeys);
+ for (Integer colIx : includedCols) {
+ stripeKeys.add(new ChunkKey(internedFilePath, stripeIx, colIx));
+ }
+ ++stripeIx;
+ }
+ if (stripeIxTo == -1) {
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("Including until " + stripeIx + " (end of file)");
+ }
+ stripeIxTo = stripeIx;
+ }
+ return (((long)stripeIxFrom) << 32) + stripeIxTo;
+ }
+
+ /**
+ * Gets chunks from cache and generates include arrays for things to be fetched.
+ * @param keys Keys to get.
+ * @param colCount Column count in the file.
+ * @param stripeIxFrom Stripe index start in the split.
+ * @param resultBuffers Resulting buffers are added here.
+ * @param resultMatrix Results that are fetched from cache are added here.
+ * @return Matrix of things are not cache.
+ */
+ private List<Integer> getChunksFromCache(List<List<ChunkKey>> keys, int colCount,
+ int stripeIxFrom, HashSet<WeakBuffer>[] resultBuffers, Chunk[][] resultMatrix) {
+ List<Integer> readyStripes = null;
+ for (List<ChunkKey> stripeKeys : keys) {
+ int stripeIx = stripeKeys.get(0).stripeIx;
+ int stripeIxMod = stripeIx - stripeIxFrom;
+ Chunk[] chunksForStripe = resultMatrix[stripeIxMod];
+ HashSet<WeakBuffer> buffersForStripe = resultBuffers[stripeIxMod];
+ Iterator<ChunkKey> iter = stripeKeys.iterator();
+ while (iter.hasNext()) {
+ ChunkKey key = iter.next();
+ Chunk result = chunkPool.getChunk(key, buffersForStripe);
+ if (result == null) continue;
+ if (Llap.LOG.isDebugEnabled()) {
+ Llap.LOG.debug("Found result in cache for " + key + ": " + result.toFullString());
+ }
+ chunksForStripe[key.colIx] = result;
+ iter.remove();
+ }
+ if (stripeKeys.isEmpty()) {
+ if (readyStripes == null) {
+ readyStripes = new ArrayList<Integer>();
+ }
+ readyStripes.add(stripeIx);
+ }
+ }
+ return readyStripes;
+ }
+
+ private Reader createReader(FileSplit fileSplit) throws IOException {
+ FileSystem fs = cachedFs;
+ Path path = fileSplit.getPath();
+ Configuration conf = new Configuration();
+ if ("pfile".equals(path.toUri().getScheme())) {
+ fs = path.getFileSystem(conf); // Cannot use cached FS due to hive tests' proxy FS.
+ }
+ return OrcFile.createReader(path, OrcFile.readerOptions(conf).filesystem(fs));
+ }
+
+ private void loadStripe(RecordReader reader, List<ChunkKey> keys, Chunk[] results,
+ HashSet<WeakBuffer> resultBuffers) throws IOException, InterruptedException {
+ // Reader is reading a single stripe; read the entirety of each column.
+ Object readCtx = reader.prepareColumnRead();
+ for (int keyIx = 0; keyIx < keys.size(); ++keyIx) {
+ ChunkKey key = keys.get(keyIx);
+ BufferInProgress colBuffer = null;
+ while (true) {
+ colBuffer = prepareReusableBuffer(resultBuffers);
+ writer.prepare(colBuffer);
+ boolean hasMoreValues = reader.readNextColumnStripe(readCtx, writer);
+ if (!hasMoreValues) break;
+ if (DebugUtils.isTraceEnabled()) {
+ Llap.LOG.info("Stripe doesn't fit into buffer");
+ }
+ // We couldn't write all rows to this buffer, so we'll close the chunk.
+ results[key.colIx] = mergeResultChunks(colBuffer, results[key.colIx], false);
+ }
+ // Done with the reader:
+ // 1) add final chunk to result;
+ // 2) add reusable buffer back to list;
+ // 3) add results to cache and resolve conflicts.
+ Chunk val = results[key.colIx] =
+ mergeResultChunks(colBuffer, results[key.colIx], true);
+ if (Llap.LOG.isDebugEnabled()) {
+ Llap.LOG.debug("Caching chunk " + key + " => " + val.toFullString());
+ }
+ Chunk cachedVal = chunkPool.addOrGetChunk(key, val, resultBuffers);
+ if (cachedVal != val) {
+ // Someone else has read and cached the same value while we were reading. Assumed to be
+ // very rare (otherwise we'd need measures to prevent it), so we will not be efficient;
+ // we will rebuild resultBuffers rather than removing buffers from them.
+ results[key.colIx] = cachedVal;
+ resultBuffers.clear();
+ for (int i = 0; i < results.length; ++i) {
+ Chunk chunk1 = results[i];
+ while (chunk1 != null) {
+ resultBuffers.add(chunk1.buffer);
+ chunk1 = chunk1.nextChunk;
+ }
+ }
+ Chunk chunk = cachedVal;
+ while (chunk != null) {
+ if (!resultBuffers.contains(chunk.buffer)) {
+ chunk.buffer.unlock();
+ }
+ chunk = chunk.nextChunk;
+ }
+ }
+ returnReusableBuffer(colBuffer);
+ }
+ }
+
+ private VectorImpl createVectorForStripe(Chunk[] rowForStripe,
+ Collection<WeakBuffer> resultBuffers, List<Type> types, List<Integer> includedCols) {
+ VectorImpl vector = new VectorImpl(resultBuffers, types.size());
+ for (Integer colIx : includedCols) {
+ // TODO: this "+ 1" is a hack relying on knowledge of ORC. It might change, esp. w/ACID.
+ Vector.Type type = vectorTypeFromOrcType(types.get(colIx + 1).getKind());
+ vector.addChunk(colIx, rowForStripe[colIx], type);
+ }
+ return vector;
+ }
+
+ private static Vector.Type vectorTypeFromOrcType(Kind orcType) {
+ switch (orcType) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case DATE:
+ case TIMESTAMP:
+ return Vector.Type.LONG;
+ case FLOAT:
+ case DOUBLE:
+ return Vector.Type.DOUBLE;
+ case STRING:
+ return Vector.Type.BINARY;
+ case DECIMAL:
+ return Vector.Type.DECIMAL;
+ default:
+ throw new UnsupportedOperationException("Unsupported type " + orcType);
+ }
+ }
+
+ public static class ChunkKey {
+ /** @param file This MUST be interned by caller. */
+ private ChunkKey(String file, int stripeIx, int colIx) {
+ this.file = file;
+ this.stripeIx = stripeIx;
+ this.colIx = colIx;
+ }
+ private final String file;
+ private final int stripeIx;
+ private final int colIx;
+
+ @Override
+ public String toString() {
+ return "[" + file + ", stripe " + stripeIx + ", colIx " + colIx + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = prime + ((file == null) ? 0 : System.identityHashCode(file));
+ return (prime * result + colIx) * prime + stripeIx;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof ChunkKey)) return false;
+ ChunkKey other = (ChunkKey)obj;
+ // Strings are interned and can thus be compared like this.
+ return stripeIx == other.stripeIx && colIx == other.colIx && file == other.file;
+ }
+ }
+}
Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkConsumer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkConsumer.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkConsumer.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkConsumer.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.processor;
+
+import org.apache.hadoop.hive.llap.api.Vector;
+
+/**
+ * Interface implemented by reader; allows it to receive blocks asynchronously.
+ */
+public interface ChunkConsumer {
+ public void init(ChunkProducerFeedback feedback);
+ public void setDone();
+ // For now this returns Vector, which has to have full rows.
+ // Vectorization cannot run on non-full rows anyway so that's ok. Maybe later we can
+ // have LazyVRB which only loads columns when needed... one can dream right?
+ public void consumeVector(Vector vector);
+ public void setError(Throwable t);
+}