You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/09/16 19:50:04 UTC

svn commit: r1625344 [2/4] - in /hive/branches/llap: ./ common/src/java/org/apache/hadoop/hive/conf/ data/conf/ data/conf/tez/ itests/qtest/ llap-client/ llap-client/src/ llap-client/src/java/ llap-client/src/java/org/ llap-client/src/java/org/apache/ ...

Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkReader.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkReader.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkReader.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,568 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.chunk;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.DoubleBuffer;
+import java.nio.LongBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.type.Decimal128;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.api.Vector;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkUtils;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+import org.apache.hadoop.hive.llap.chunk.ChunkUtils.RleSegmentType;
+
+/**
+ * Chunk reader; reads chained chunks (we might want to separate this later).
+ *
+ * Initial chunk format:
+ * [version byte][int number of rows][padding to 8 bytes](segments)(if string, [dictionary])
+ * Version is 0 for initial format. Segment format:
+ * [type byte][int number of rows][padding to 8 bytes](values).
+ * One value is stored for repeated segments, and none for null-repeated segments. For non-repeated
+ * segments, values and bitmasks are interspersed; N-byte bitmask followed is by N*8 values,
+ * repeated. Last bitmask may be smaller if there are less values, but it's still rounded to 8
+ * bytes. Values for nulls are still stored (we could save space by not storing them, like ORC).
+ *
+ * Values are stored with fixed-length and are 8 bytes for long and double; 8 bytes for strings
+ * (dictionary offset+length); TODO bytes for decimals. Values stored for NULLs are undefined.
+ */
+public class ChunkReader implements Vector.ColumnReader {
+  private final Type type;
+  private Chunk chunk;
+  private List<ByteBuffer> chunkBuffers;
+  private List<RleSegmentType> segmentTypes;
+  private List<Integer> segmentOffsets, segmentRowCounts;
+  private int rowOffsetInFirstSegment;
+
+  private int remainingRowsInLastChunk = 0, rowOffsetInNextSegment = 0, nextSegmentOffset = 0;
+
+  private int lastRowCountNeeded = 0;
+
+  public ChunkReader(Type type, Chunk chunk) {
+    this.type = type;
+    this.chunk = chunk;
+    // Verify chunk version.
+    byte firstByte = chunk.buffer.getContents().get(chunk.offset);
+    if (firstByte != ChunkUtils.FORMAT_VERSION) {
+      throw new UnsupportedOperationException("Chunk doesn't start as expected: " + firstByte);
+    }
+    if (Integer.bitCount(ChunkUtils.BITMASK_SIZE_BYTES) != 1
+        || (ChunkUtils.BITMASK_SIZE_BYTES < 8)) {
+      throw new AssertionError("Must be a power of two >= 8: "+ ChunkUtils.BITMASK_SIZE_BYTES);
+    }
+  }
+
+  public int getNumRowsRemaining() {
+    if (this.chunk == null) return 0;
+    int result = remainingRowsInLastChunk;
+    Chunk chunk = this.chunk;
+    if (remainingRowsInLastChunk > 0) {
+      chunk = chunk.nextChunk; // remainingRowsInLastChunk accounts for current one
+    }
+    while (chunk != null) {
+      ByteBuffer bb = chunk.buffer.getContents();
+      result += bb.getInt(chunk.offset + 1);
+      chunk = chunk.nextChunk;
+    }
+    return result;
+  }
+
+  public void next(int rowCountNeeded) {
+    lastRowCountNeeded = rowCountNeeded;
+    if (rowCountNeeded == 0) return;
+    ByteBuffer chunkBuffer = null;
+    if (chunkBuffers == null) {
+      init();
+    } else {
+      if (remainingRowsInLastChunk > 0) {
+        chunkBuffer = chunkBuffers.get(chunkBuffers.size() - 1);
+      }
+      reset();
+    }
+
+    while (chunk != null && rowCountNeeded > 0) {
+      if (chunkBuffer == null) {
+        chunkBuffer = createChunkBuffer(chunk);
+        remainingRowsInLastChunk = chunkBuffer.getInt(1); // skip header byte
+        nextSegmentOffset = 8;
+      }
+      while (rowCountNeeded > 0 && remainingRowsInLastChunk > 0) {
+        int segmentOffset = nextSegmentOffset;
+        assert (segmentOffset & 7) == 0; // must be 8-byte aligned
+        RleSegmentType segmentType = RleSegmentType.fromInt(chunkBuffer.get(nextSegmentOffset));
+        ++nextSegmentOffset;
+        int segmentRowCount = chunkBuffer.getInt(nextSegmentOffset);
+        int segmentRowOffset = rowOffsetInNextSegment; // Only non-zero for the first segment.
+        nextSegmentOffset += 7;
+        if (DebugUtils.isTraceEnabled()) {
+          Llap.LOG.info("Segment type " + segmentType + " with " + segmentRowCount
+              + " rows (skipping " + segmentRowOffset + "); at " + nextSegmentOffset + "; "
+              + remainingRowsInLastChunk + " more in this chunk including this segment");
+        }
+        int dataLength = ChunkUtils.getSegmentDataSize(type, segmentType, segmentRowCount);
+        int segmentRowCountAvail = segmentRowCount - segmentRowOffset;
+        if (segmentRowCountAvail > rowCountNeeded) {
+          // We have some rows remaining in the same segment.
+          nextSegmentOffset = segmentOffset;
+          rowOffsetInNextSegment = segmentRowOffset + rowCountNeeded;
+          segmentRowCountAvail = rowCountNeeded;
+        } else {
+          nextSegmentOffset += dataLength;
+          rowOffsetInNextSegment = 0;
+        }
+        remainingRowsInLastChunk -= segmentRowCountAvail;
+        rowCountNeeded -= segmentRowCountAvail;
+
+        // Finally, add segment to data.
+        if (DebugUtils.isTraceEnabled()) {
+          Llap.LOG.info("Adding segment with " + segmentRowCountAvail + " rows in "
+              + chunk.buffer + "; segment row offset " + segmentRowOffset);
+        }
+        chunkBuffers.add(chunkBuffer);
+        segmentTypes.add(segmentType);
+        segmentOffsets.add(segmentOffset);
+        segmentRowCounts.add(segmentRowCount);
+        if (rowOffsetInFirstSegment == -1) {
+          rowOffsetInFirstSegment = segmentRowOffset;
+        }
+      }
+      if (remainingRowsInLastChunk == 0) {
+        // We are done with current chunk.
+        chunk = chunk.nextChunk;
+        nextSegmentOffset = 0;
+      } else {
+        assert rowCountNeeded == 0;
+      }
+      chunkBuffer = null;
+    }
+    assert !chunkBuffers.isEmpty() : "No rows found, expected " + rowCountNeeded;
+  }
+
+  private void init() {
+    chunkBuffers = new ArrayList<ByteBuffer>();
+    segmentTypes = new ArrayList<RleSegmentType>();
+    segmentOffsets = new ArrayList<Integer>();
+    segmentRowCounts = new ArrayList<Integer>();
+    remainingRowsInLastChunk = 0;
+    rowOffsetInFirstSegment = -1;
+  }
+
+  private void reset() {
+    chunkBuffers.clear();
+    segmentRowCounts.clear();
+    segmentTypes.clear();
+    segmentOffsets.clear();
+    rowOffsetInFirstSegment = -1;
+  }
+
+  private ByteBuffer createChunkBuffer(Chunk chunk) {
+    ByteBuffer bb = chunk.buffer.getContents();
+    if (DebugUtils.isTraceEnabled()) {
+      Llap.LOG.info("Chunk in " + chunk.buffer
+          + " at " + chunk.offset + ", length " + chunk.length);
+    }
+    bb.position(chunk.offset);
+    ByteBuffer chunkBuffer = bb.slice();
+    chunkBuffer.limit(chunk.length);
+    return chunkBuffer;
+  }
+
+  @Override
+  public boolean isSameValue() {
+    // Assume repeated values wouldn't be written as separate segments
+    if (segmentTypes.size() > 1) return false;
+    RleSegmentType type = segmentTypes.get(0);
+    return type == RleSegmentType.REPEATING_NULL || type == RleSegmentType.REPEATING_VALUE;
+  }
+
+  @Override
+  public boolean hasNulls() {
+    for (RleSegmentType type : segmentTypes) {
+      if (type == RleSegmentType.REPEATING_NULL || type == RleSegmentType.UNIQUE_NULL_BITMASK) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // TODO: Ideally we want to get rid of this and process segments as we go (nextLongs ...);
+  //       however, this may preclude us from predicting whether there are nulls, etc.
+  //       Plus code will be even more complex. But might be worth it.
+  @Override
+  public void copyLongs(long[] dest, boolean[] isNull, int destOffset) throws IOException {
+    int segmentRowOffset = rowOffsetInFirstSegment;
+    for (int segmentIx = 0; segmentIx < segmentTypes.size(); ++segmentIx) {
+      int segmentRowCount = segmentRowCounts.get(segmentIx);
+      RleSegmentType segmentType = segmentTypes.get(segmentIx);
+      ByteBuffer chunkBuffer = chunkBuffers.get(segmentIx);
+      int rowCountToRead = Math.min(segmentRowCount - segmentRowOffset, lastRowCountNeeded);
+      lastRowCountNeeded -= rowCountToRead;
+      if (DebugUtils.isTraceEnabled()) {
+        Llap.LOG.info("Copying " + rowCountToRead + " rows from segment " + segmentIx
+          + " of type " + segmentType + " segment at " + segmentOffsets.get(segmentIx)
+          + " using row offset " + segmentRowOffset + " to result offset " + destOffset);
+      }
+      switch (segmentType) {
+      case REPEATING_NULL: {
+        assert isNull != null;
+        Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, true);
+        destOffset += rowCountToRead;
+        break;
+      }
+      case REPEATING_VALUE: {
+        long value = chunkBuffer.getLong(segmentOffsets.get(segmentIx) + 8);
+        Arrays.fill(dest, destOffset, destOffset + rowCountToRead, value);
+        if (isNull != null) {
+          Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, false);
+        }
+        destOffset += rowCountToRead;
+        break;
+      }
+      case UNIQUE_NOT_NULL: {
+        int dataOffset = segmentOffsets.get(segmentIx) + 8;
+        assert (dataOffset & 7) == 0; // Must be 8-byte aligned.
+        copyLongValues(chunkBuffer, dataOffset, segmentRowOffset,
+            dest, destOffset, rowCountToRead);
+        if (isNull != null) {
+          Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, false);
+        }
+        destOffset += rowCountToRead;
+        break;
+      }
+      case UNIQUE_NULL_BITMASK: {
+        longCopier.initDest(dest);
+        destOffset = copyValuesWithNulls(chunkBuffer, segmentOffsets.get(segmentIx),
+            segmentRowOffset, segmentRowCount, longCopier, isNull, destOffset, rowCountToRead);
+        break;
+      }
+      default: throw new IOException("Unsupported segment type " + segmentType);
+      }
+      segmentRowOffset = 0;
+    }
+  }
+
+  private void copyLongValues(ByteBuffer chunkBuffer, int dataOffset,
+      int segmentRowOffset, long[] dest, int destOffset, int rowCountToRead) {
+    LongBuffer longBuffer = chunkBuffer.asLongBuffer();
+    longBuffer.position((dataOffset >>> 3) + segmentRowOffset);
+    longBuffer.get(dest, destOffset, rowCountToRead);
+    if (DebugUtils.isTraceDataEnabled()) {
+      Llap.LOG.info("Copied " + rowCountToRead + " rows from long offset "
+        + ((dataOffset >>> 3) + segmentRowOffset) + " (" + dataOffset + ", "
+          + segmentRowOffset + "): " + DebugUtils.toString(dest, destOffset, rowCountToRead));
+      Llap.LOG.debug("VRB vector now looks like " + Arrays.toString(dest));
+    }
+  }
+
+  @Override
+  public void copyDoubles(double[] dest, boolean[] isNull, int destOffset) throws IOException {
+    int segmentRowOffset = rowOffsetInFirstSegment;
+    for (int segmentIx = 0; segmentIx < segmentTypes.size(); ++segmentIx) {
+      int segmentRowCount = segmentRowCounts.get(segmentIx);
+      RleSegmentType segmentType = segmentTypes.get(segmentIx);
+      ByteBuffer chunkBuffer = chunkBuffers.get(segmentIx);
+      int rowCountToRead = Math.min(segmentRowCount - segmentRowOffset, lastRowCountNeeded);
+      lastRowCountNeeded -= rowCountToRead;
+      switch (segmentType) {
+      case REPEATING_NULL: {
+        assert isNull != null;
+        Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, true);
+        destOffset += rowCountToRead;
+        break;
+      }
+      case REPEATING_VALUE: {
+        long value = chunkBuffer.getLong(segmentOffsets.get(segmentIx) + 8);
+        Arrays.fill(dest, destOffset, destOffset + rowCountToRead, value);
+        if (isNull != null) {
+          Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, false);
+        }
+        destOffset += rowCountToRead;
+        break;
+      }
+      case UNIQUE_NOT_NULL: {
+        int dataOffset = segmentOffsets.get(segmentIx) + 8;
+        assert (dataOffset & 7) == 0; // Must be 8-byte aligned.
+        copyDoubleValues(chunkBuffer, dataOffset, segmentRowOffset,
+            dest, destOffset, rowCountToRead);
+        if (isNull != null) {
+          Arrays.fill(isNull, destOffset, destOffset + rowCountToRead, false);
+        }
+        destOffset += rowCountToRead;
+        break;
+      }
+      case UNIQUE_NULL_BITMASK: {
+        doubleCopier.initDest(dest);
+        destOffset = copyValuesWithNulls(chunkBuffer, segmentOffsets.get(segmentIx),
+            segmentRowOffset, segmentRowCount, doubleCopier, isNull, destOffset, rowCountToRead);
+        break;
+      }
+      default: throw new IOException("Unsupported segment type " + segmentType);
+      }
+      segmentRowOffset = 0;
+    }
+  }
+
+  private void copyDoubleValues(ByteBuffer chunkBuffer, int dataOffset,
+      int segmentRowOffset, double[] dest, int destOffset, int rowCountToRead) {
+    DoubleBuffer doubleBuffer = chunkBuffer.asDoubleBuffer();
+    doubleBuffer.position((dataOffset >>> 3) + segmentRowOffset);
+    doubleBuffer.get(dest, destOffset, rowCountToRead);
+  }
+
+  private int copyValuesWithNulls(ByteBuffer chunkBuffer, int segmentDataOffset,
+      int segmentRowOffset, int segmentRowCount, ValueCopier valueHelper, boolean[] isNull,
+      int destOffset, int rowCountToRead) {
+    if (rowCountToRead == 0) return destOffset;
+    int valueSize = ChunkUtils.TYPE_SIZES[type.value()];
+    // Prepare to read (or skip) the first bitmask.
+    int bitmasksSkipped = 0;
+    int currentBitmaskOffset = segmentDataOffset + 8;
+    int currentBitmaskSize = determineBitmaskSizeBytes(bitmasksSkipped, segmentRowCount);
+    // Size of bitmask and corresponding values in bytes, for a BITMASK_SIZE_BYTES-sized bitmask.
+    int sizeOfBitmaskAndValues = ChunkUtils.getFullBitmaskSize(valueSize);
+    valueHelper.initSrc(chunkBuffer);
+
+    // For the first segment, we might have to skip some rows. This is sadly most of this method.
+    if (segmentRowOffset > 0) {
+      // First, see how many full bitmasks we need to skip.
+      int bitmasksToSkip = (segmentRowOffset / ChunkUtils.BITMASK_SIZE_BITS);
+      assert bitmasksToSkip == 0 || currentBitmaskSize == ChunkUtils.BITMASK_SIZE_BYTES;
+      bitmasksSkipped += bitmasksToSkip;
+      currentBitmaskOffset += (bitmasksToSkip * sizeOfBitmaskAndValues);
+      currentBitmaskSize = determineBitmaskSizeBytes(bitmasksSkipped, segmentRowCount);
+      segmentRowOffset = segmentRowOffset % ChunkUtils.BITMASK_SIZE_BITS;
+
+      // Remember how many values we are skipping in the current bitmask, for value copying.
+      int valuesToSkip = segmentRowOffset;
+      // Then, in the bitmask we are in, skip however many full bytes we need to skip.
+      int currentOffsetInBitmask = 0;
+      if (segmentRowOffset >= 8) {
+        int bytesToSkip = segmentRowOffset >>> 3;
+        currentOffsetInBitmask = bytesToSkip;
+        segmentRowOffset = segmentRowOffset & 7;
+      }
+
+      if (DebugUtils.isTraceEnabled()) {
+        Llap.LOG.info("Skipping " + bitmasksToSkip + " bitmasks and "
+            + currentOffsetInBitmask + " bytes; for a bitmask at " + currentBitmaskOffset
+            + " will skip " + valuesToSkip + " values and " + segmentRowOffset + " bits");
+      }
+      // Finally, we may need to skip some bits in the first byte we are reading.
+      // Read the partial byte of the bitmask (and corresponding long values).
+      if (segmentRowOffset > 0) {
+        int partialByteRowCount = Math.min(rowCountToRead, 8 - segmentRowOffset);
+        copyBitsFromByte(chunkBuffer.get(currentBitmaskOffset + currentOffsetInBitmask),
+            isNull, destOffset, segmentRowOffset, partialByteRowCount);
+        valueHelper.copyValues(destOffset,
+            currentBitmaskOffset + currentBitmaskSize, valuesToSkip, partialByteRowCount);
+        if (DebugUtils.isTraceDataEnabled()) {
+          Llap.LOG.info("After partial first byte w/" + partialByteRowCount
+              + ", byte was " + chunkBuffer.get(currentBitmaskOffset));
+          Llap.LOG.debug("After partial first byte w/" + partialByteRowCount
+              + ", booleans are " + DebugUtils.toString(isNull));
+        }
+
+        rowCountToRead -= partialByteRowCount;
+        destOffset += partialByteRowCount;
+        ++currentOffsetInBitmask;
+        if (currentOffsetInBitmask == ChunkUtils.BITMASK_SIZE_BYTES && rowCountToRead > 0) {
+          // We only needed part of the last byte from this bitmask, go to the next one.
+          ++bitmasksSkipped;
+          currentBitmaskOffset += sizeOfBitmaskAndValues;
+          currentBitmaskSize = determineBitmaskSizeBytes(bitmasksSkipped, segmentRowCount);
+          currentOffsetInBitmask = 0;
+        }
+      }
+
+      if (rowCountToRead == 0) return destOffset;
+
+      // Then, if we have a partial bitmask, get to the boundary.
+      if (currentOffsetInBitmask > 0) {
+        // First, copy the bits, then the values at the same destOffset.
+        for (int i = currentOffsetInBitmask, tmpToRead = rowCountToRead, tmpOffset = destOffset;
+            (i < currentBitmaskSize) && (tmpToRead > 0); ++i) {
+          int bitsToRead = Math.min(tmpToRead, 8);
+          copyBitsFromByte(chunkBuffer.get(currentBitmaskOffset + i),
+              isNull, tmpOffset, 0, bitsToRead);
+          if (DebugUtils.isTraceDataEnabled()) {
+            Llap.LOG.info("After copying " + bitsToRead +" bits from byte at "
+                + (currentBitmaskOffset + i) + " to " + tmpOffset + ", booleans are "
+                + DebugUtils.toString(isNull));
+          }
+          tmpOffset += bitsToRead;
+          tmpToRead -= bitsToRead;
+        }
+        valuesToSkip = currentOffsetInBitmask << 3;
+        int valuesToRead = Math.min((currentBitmaskSize << 3) - valuesToSkip, rowCountToRead);
+        valueHelper.copyValues(destOffset, currentBitmaskOffset + currentBitmaskSize,
+            valuesToSkip, valuesToRead);
+        destOffset += valuesToRead;
+        rowCountToRead -= valuesToRead;
+        if (rowCountToRead == 0) return destOffset;
+        // Go to next bitmask.
+        currentBitmaskOffset += sizeOfBitmaskAndValues;
+        ++bitmasksSkipped;
+        currentBitmaskSize = determineBitmaskSizeBytes(bitmasksSkipped, segmentRowCount);
+      }
+    } // end of the epic "segmentRowOffset > 0" if
+
+    // This is the main code path
+    if (DebugUtils.isTraceEnabled()) {
+      Llap.LOG.info("After segment offset, reading " + rowCountToRead + " rows from data at "
+          + currentBitmaskOffset + " with " + segmentRowCount + " rows to offset " + destOffset
+          + "; bitmask size " + currentBitmaskSize);
+    }
+
+    // Now we are finally done with all the crooked offsets (if any) so we can just read the data.
+    while (true) {
+      // Read one bitmask and corresponding values.
+      for (int i = 0, tmpCountToRead = rowCountToRead, tmpOffset = destOffset;
+          i < currentBitmaskSize && tmpCountToRead > 0; ++i) {
+        byte b = chunkBuffer.get(currentBitmaskOffset + i);
+        int bitsToRead = Math.min(tmpCountToRead, 8);
+        copyBitsFromByte(b & 0xff, isNull, tmpOffset, 0, bitsToRead);
+        if (DebugUtils.isTraceDataEnabled()) {
+          Llap.LOG.info("Copied " + bitsToRead + " bits from " + b + " ("
+              + Integer.toBinaryString(b & 0xff) + ") at " + (currentBitmaskOffset + i)
+              + " to " + tmpOffset + "; current state is " + DebugUtils.toString(isNull));
+        }
+        tmpOffset += bitsToRead;
+        tmpCountToRead -= bitsToRead;
+      }
+      int valuesToRead = Math.min(currentBitmaskSize << 3, rowCountToRead);
+      int valuesOffset = currentBitmaskOffset + currentBitmaskSize;
+      valueHelper.copyValues(destOffset, valuesOffset, 0, valuesToRead);
+      destOffset += valuesToRead;
+      rowCountToRead -= valuesToRead;
+      if (rowCountToRead == 0) break;
+
+      ++bitmasksSkipped;
+      currentBitmaskOffset += sizeOfBitmaskAndValues;
+      currentBitmaskSize = determineBitmaskSizeBytes(bitmasksSkipped, segmentRowCount);
+    }
+    return destOffset;
+  }
+
+  /** Helper interface to share the parts that deal with bitmasks, esp.
+   * the insane offset logic, between method copying various datatypes. */
+  private interface ValueCopier {
+    void initSrc(ByteBuffer chunkBuffer);
+
+    void copyValues(int destOffset,
+        int valuesOffsetBytes, int valuesToSkip, int valuesToCopy);
+  }
+
+  private static class LongCopier implements ValueCopier {
+    LongBuffer dataBuffer = null;
+    long[] dest;
+    public void initDest(long[] dest) {
+      this.dest = dest;
+    }
+    public void initSrc(ByteBuffer chunkBuffer) {
+      dataBuffer = chunkBuffer.asLongBuffer();
+    }
+
+    public void copyValues(int destOffset,
+        int valuesOffsetBytes, int valuesToSkip, int valuesToCopy) {
+      dataBuffer.position((valuesOffsetBytes >>> 3) + valuesToSkip);
+      dataBuffer.get(dest, destOffset, valuesToCopy);
+      if (DebugUtils.isTraceDataEnabled()) {
+        Llap.LOG.info("After copying " + valuesToCopy + " from " + valuesOffsetBytes +
+            " (skip " + valuesToSkip + ", long offset "+ ((valuesOffsetBytes >>> 3) + valuesToSkip)
+             + ") to " + destOffset + ", values are " + DebugUtils.toString(dest, destOffset,
+                 valuesToCopy) + " and dest is " + DebugUtils.toString(dest, 0, dest.length));
+      }
+    }
+  }
+
+  private static class DoubleCopier implements ValueCopier {
+    DoubleBuffer dataBuffer = null;
+    double[] dest;
+    public void initDest(double[] dest) {
+      this.dest = dest;
+    }
+    public void initSrc(ByteBuffer chunkBuffer) {
+      dataBuffer = chunkBuffer.asDoubleBuffer();
+    }
+
+    public void copyValues(int destOffset,
+        int valuesOffsetBytes, int valuesToSkip, int valuesToCopy) {
+      valuesOffsetBytes += (valuesToSkip << 3);
+      dataBuffer.position(valuesOffsetBytes >>> 3);
+      dataBuffer.get(dest, destOffset, valuesToCopy);
+    }
+  }
+  private LongCopier longCopier = new LongCopier();
+  private DoubleCopier doubleCopier = new DoubleCopier();
+
+  private int determineBitmaskSizeBytes(int skipped, int segmentRowCount) {
+    int adjustedRowCount = segmentRowCount - ChunkUtils.BITMASK_SIZE_BITS * skipped;
+    if (adjustedRowCount >= ChunkUtils.BITMASK_SIZE_BITS) return ChunkUtils.BITMASK_SIZE_BYTES;
+    return ChunkUtils.align8((adjustedRowCount >>> 3) + (((adjustedRowCount & 7) != 0) ? 1 : 0));
+  }
+
+  @Override
+  public long getLong() {
+    return chunkBuffers.get(0).getLong(segmentOffsets.get(0) + 8);
+  }
+
+  @Override
+  public double getDouble() {
+    return chunkBuffers.get(0).getDouble(segmentOffsets.get(0) + 8);
+  }
+
+  private void copyBitsFromByte(int b, boolean[] dest, int offset, int skipBits, int bitCount) {
+    // TODO: we could unroll the loop for full-byte copy.
+    int shift = 7 - skipBits;
+    for (int i = 0; i < bitCount; ++i, --shift) {
+      dest[offset++] = (b & (1 << shift)) != 0;
+    }
+  }
+
+  // TODO: add support for Decimal and Binary
+
+  @Override
+  public Decimal128 getDecimal() {
+    throw new UnsupportedOperationException("Decimal not currently supported");
+  }
+
+  @Override
+  public void copyDecimals(Decimal128[] dest, boolean[] isNull, int offset) {
+    throw new UnsupportedOperationException("Decimal not currently supported");
+  }
+
+  @Override
+  public byte[] getBytes() {
+    throw new UnsupportedOperationException("Binary not currently supported");
+  }
+
+  @Override
+  public void copyBytes(byte[][] dest, int[] destStarts, int[] destLengths,
+      boolean[] isNull, int offset) {
+    throw new UnsupportedOperationException("Binary not currently supported");
+  }
+}

Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkUtils.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkUtils.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkUtils.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.chunk;
+
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+
+/** Shared and utility methods for ChunkReader and ChunkWriter. */
+public class ChunkUtils {
+  public final static int BITMASK_SIZE_BYTES = 8;
+  public final static int BITMASK_SIZE_BITS = BITMASK_SIZE_BYTES * 8;
+
+  public static final byte FORMAT_VERSION = 0;
+  public static final byte[] TYPE_SIZES = new byte[Type.BINARY.value() + 1];
+  static {
+    TYPE_SIZES[Type.BINARY.value()] = -1; // TODO: add support for binary
+    TYPE_SIZES[Type.DECIMAL.value()] = -1; // TODO: add support for decimal
+    TYPE_SIZES[Type.LONG.value()] = 8;
+    TYPE_SIZES[Type.DOUBLE.value()] = 8;
+  }
+
+  public static enum RleSegmentType {
+    INVALID(0),
+    REPEATING_NULL(1),
+    REPEATING_VALUE(2),
+    UNIQUE_NOT_NULL(3),
+    UNIQUE_NULL_BITMASK(4);
+
+    private byte value;
+    private RleSegmentType(int val) {
+      assert val >= Byte.MIN_VALUE && val <= Byte.MAX_VALUE;
+      this.value = (byte)val;
+    }
+    private static final RleSegmentType[] ints = new RleSegmentType[UNIQUE_NULL_BITMASK.value + 1];
+    static {
+      for (RleSegmentType type : RleSegmentType.values()) {
+        ints[type.value] = type;
+      }
+    }
+    public static RleSegmentType fromInt(int value) {
+      return ints[value];
+    }
+    public byte getValue() {
+      return value;
+    }
+  }
+
+  public static int getSegmentDataSize(Type type, RleSegmentType segmentType, int rowCount) {
+    int valueSize = TYPE_SIZES[type.value()];
+    switch (segmentType) {
+    case REPEATING_NULL: return 0;
+    case REPEATING_VALUE: return valueSize;
+    case UNIQUE_NOT_NULL: return valueSize * rowCount;
+    case UNIQUE_NULL_BITMASK:return valueSize * rowCount + align8(bitMaskSize(rowCount));
+    default: throw new AssertionError("Unsupported segment type " + segmentType);
+    }
+  }
+
+  public static int bitMaskSize(int rowCount) {
+    return (rowCount >>> 3) + (((rowCount & 7) > 0) ? 1 : 0);
+  }
+
+  public static int align8(int number) {
+    int rem = number & 7;
+    return number - rem + (rem == 0 ? 0 : 8);
+  }
+
+  public static int align64(int number) {
+    int rem = number & 63;
+    return number - rem + (rem == 0 ? 0 : 64);
+  }
+
+  public static int getNonRepeatingValuesOffset(RleSegmentType type, int rowCount) {
+    if (type == RleSegmentType.UNIQUE_NULL_BITMASK) {
+      return 8 + align8(bitMaskSize(rowCount));
+    }
+    return 8;
+  }
+
+  public static int getFullBitmaskSize(int sizeOf) {
+    return BITMASK_SIZE_BYTES * (1 + (sizeOf << 3));
+  }
+}

Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkWriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkWriterImpl.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkWriterImpl.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/chunk/ChunkWriterImpl.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.chunk;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkUtils;
+import org.apache.hadoop.hive.llap.chunk.ChunkUtils.RleSegmentType;
+import org.apache.hadoop.hive.llap.loader.BufferInProgress;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+
+/**
+ * Chunk writer. Reusable, not thread safe. See ChunkReader for format details.
+ */
+public class ChunkWriterImpl implements ChunkWriter {
+  private BufferInProgress colBuffer;
+
+  public void prepare(BufferInProgress colBuffer) {
+    this.colBuffer = colBuffer;
+    Chunk chunk = this.colBuffer.ensureChunk();
+    if (chunk.length == 0) {
+      // This is a new chunk; reserve space for header.
+      colBuffer.offset += 8;
+      chunk.length = 8;
+    }
+    valuesOffset = colBuffer.offset;
+  }
+
+  /**
+   * Completes the chunk by writing the header with externally-tracked row count.
+   * Does not have to be prepare()-d - any chunk can be update via this method.
+   * @param chunk Chunk to update.
+   * @param rowCount Row count in the chunk.
+   */
+  public void finishChunk(Chunk chunk, int rowCount) {
+    // The space for chunk start is reserved; no need to update offset or length.
+    assert currentSegmentStart == -1;
+    ByteBuffer buf = chunk.buffer.getContents();
+    buf.put(chunk.offset, ChunkUtils.FORMAT_VERSION);
+    buf.putInt(chunk.offset + 1, rowCount);
+  }
+
+  // State of current segment.
+  private int currentSegmentStart = -1, currentSegmentValues = -1;
+  private boolean currentSegmentIsRepeating = false, currentSegmentHasNulls = false;
+  private long currentRepeatingLongValue = -1;
+  private int currentSizeOf = -1;
+
+  // State of the unique segment currently being written.
+  private int currentBitmaskOffset = -1, currentBitmaskLimit = -1,
+      valuesOffset = -1, valuesSinceBitmask = -1;
+
+  @Override
+  public int estimateValueCountThatFits(Type type, boolean hasNulls) {
+    // Assume we'd only need to write unique values without nulls, we can always do that.
+    // If we are in the middle of a bitmask segment, space for bitmask was already reserved
+    // so values will take just as much space as without bitmask.
+    // Caller is supposed to re-estimate after every write.
+    return (colBuffer.getSpaceLeft(valuesOffset) - 8) / ChunkUtils.TYPE_SIZES[type.value()];
+  }
+
+  @Override
+  public void writeLongs(long[] src, int srcOffset, int srcCount, NullsState nullsState) {
+    writeLongsInternal(src, srcOffset, srcCount, nullsState, true);
+  }
+
+  @Override
+  public void writeLongs(byte[] src, int srcOffset, int srcCount, NullsState nullsState) {
+    writeLongsInternal(src, srcOffset, srcCount, nullsState, false);
+  }
+
+  private void writeLongsInternal(
+      Object srcObj, int srcOffset, int srcCount, NullsState nullsState, boolean isLongSrc) {
+    long[] srcL = isLongSrc ? (long[])srcObj : null;
+    byte[] srcB = isLongSrc ? null : (byte[])srcObj;
+    ByteBuffer buffer = colBuffer.buffer.getContents();
+    currentSizeOf = ChunkUtils.TYPE_SIZES[Type.LONG.value()];
+    ensureUniqueValueSegment(buffer, srcCount, nullsState);
+    if (!currentSegmentHasNulls) {
+      valuesOffset = isLongSrc
+          ? writeLongs(buffer, valuesOffset, srcL, srcOffset, srcCount, currentSizeOf)
+          : writeLongs(buffer, valuesOffset, srcB, srcOffset, srcCount, currentSizeOf);
+      currentSegmentValues += srcCount;
+    } else {
+      if (valuesSinceBitmask == currentBitmaskLimit) {
+        startNextBitmask(buffer);
+      }
+      // Write bitmasks followed by values, until we write all the values.
+      while (srcCount > 0) {
+        int valuesToWrite = Math.min(currentBitmaskLimit - valuesSinceBitmask, srcCount);
+        assert valuesToWrite > 0 : valuesSinceBitmask + "/" + currentBitmaskLimit + " " + srcCount;
+        writeZeroesIntoBytes(buffer, currentBitmaskOffset, valuesSinceBitmask, valuesToWrite);
+        valuesOffset = isLongSrc
+            ? writeLongs(buffer, valuesOffset, srcL, srcOffset, valuesToWrite, currentSizeOf)
+            : writeLongs(buffer, valuesOffset, srcB, srcOffset, valuesToWrite, currentSizeOf);
+        valuesSinceBitmask += valuesToWrite;
+        currentSegmentValues += valuesToWrite;
+        srcOffset += valuesToWrite;
+        srcCount -= valuesToWrite;
+        if (srcCount > 0) {
+          assert valuesSinceBitmask == currentBitmaskLimit;
+          startNextBitmask(buffer);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeDoubles(double[] src, int srcOffset, int srcCount, NullsState nullsState) {
+    ByteBuffer buffer = colBuffer.buffer.getContents();
+    currentSizeOf = ChunkUtils.TYPE_SIZES[Type.DOUBLE.value()];
+    ensureUniqueValueSegment(buffer, srcCount, nullsState);
+    if (!currentSegmentHasNulls) {
+      valuesOffset = writeDoubles(buffer, valuesOffset, src, srcOffset, srcCount, currentSizeOf);
+      currentSegmentValues += srcCount;
+    } else {
+      if (valuesSinceBitmask == currentBitmaskLimit) {
+        startNextBitmask(buffer);
+      }
+      // Write bitmasks followed by values, until we write all the values.
+      while (srcCount > 0) {
+        int valuesToWrite = Math.min(currentBitmaskLimit - valuesSinceBitmask, srcCount);
+        assert valuesToWrite > 0;
+        writeZeroesIntoBytes(buffer, currentBitmaskOffset, valuesSinceBitmask, valuesToWrite);
+        valuesOffset = writeDoubles(
+            buffer, valuesOffset, src, srcOffset, valuesToWrite, currentSizeOf);
+        valuesSinceBitmask += valuesToWrite;
+        currentSegmentValues += valuesToWrite;
+        srcOffset += valuesToWrite;
+        srcCount -= valuesToWrite;
+        if (srcCount > 0) {
+          assert valuesSinceBitmask == currentBitmaskLimit;
+          startNextBitmask(buffer);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void writeNulls(int count, boolean followedByNonNull) {
+    if (!currentSegmentHasNulls || (!currentSegmentIsRepeating
+        && valuesSinceBitmask == 0 && count > END_UNIQUE_SEGMENT_RUN_LEN)) {
+      finishCurrentSegment();
+    }
+    if (currentSegmentStart != -1 && !currentSegmentIsRepeating) {
+      assert currentSegmentHasNulls && valuesSinceBitmask > 0;
+      assert currentSizeOf > 0;
+      ByteBuffer buffer = colBuffer.buffer.getContents();
+      // We are writing into an existing segment with bitmasks. Currently, we arbitrarily
+      // choose to finish writing bitmask in such cases. Given that we write values despite
+      // bitmask, this may be suboptimal, but otherwise we might write million tiny segments.
+      int valuesToWrite = Math.min(currentBitmaskLimit - valuesSinceBitmask, count);
+      if (valuesToWrite > 0) {
+        writeOnesIntoBytes(buffer, currentBitmaskOffset, valuesSinceBitmask, valuesToWrite);
+        valuesOffset += (currentSizeOf * valuesToWrite);
+        valuesSinceBitmask += valuesToWrite;
+        currentSegmentValues += valuesToWrite;
+        count -= valuesToWrite;
+      }
+      if (count == 0) return;
+      // Might not make sense if count remaining is low and is followed by non-nulls.
+      finishCurrentSegment();
+    }
+    if (currentSegmentStart == -1) {
+      // We have no segment. For small count, starting a bitmask might make sense, but for now
+      // we always start repeated nulls segment, even if it doesn't make any sense.
+      currentSizeOf = -1;
+      startRepeatingSegment();
+      currentSegmentHasNulls = true;
+    }
+    assert currentSegmentIsRepeating && currentSegmentHasNulls;
+    currentSegmentValues += count;
+  }
+
+  /** Arbitrary; the tradeoff is between wasting space writing repeated values,
+   *  and having many tiny segments that are more expensive to read. */
+  private static final int END_UNIQUE_SEGMENT_RUN_LEN = 10;
+  @Override
+  public void writeRepeatedLongs(long value, int count, NullsState nullsState) {
+    boolean isIncompatibleRepeating = currentSegmentIsRepeating
+          && (currentSegmentHasNulls || currentRepeatingLongValue != value);
+    boolean isAtEndOfBitmask = !currentSegmentIsRepeating
+        && currentSegmentHasNulls && valuesSinceBitmask == 0;
+    if (isIncompatibleRepeating || (isAtEndOfBitmask && count >= END_UNIQUE_SEGMENT_RUN_LEN)) {
+      finishCurrentSegment();
+    }
+    if (currentSegmentStart != -1 && !currentSegmentIsRepeating) {
+      assert currentSizeOf > 0;
+      ByteBuffer buffer = colBuffer.buffer.getContents();
+      if (currentSegmentHasNulls) {
+        assert valuesSinceBitmask > 0;
+        // See writeNulls - similar logic.
+        int valuesToWrite = Math.min(currentBitmaskLimit - valuesSinceBitmask, count);
+        if (valuesToWrite > 0) {
+          writeZeroesIntoBytes(buffer, currentBitmaskOffset, valuesSinceBitmask, valuesToWrite);
+          valuesOffset = writeLongs(buffer, valuesOffset, value, valuesToWrite, currentSizeOf);
+          valuesSinceBitmask += valuesToWrite;
+          currentSegmentValues += valuesToWrite;
+          count -= valuesToWrite;
+        }
+        if (count > 0) {
+          finishCurrentSegment();
+        }
+      } else if (count < END_UNIQUE_SEGMENT_RUN_LEN) {
+        valuesOffset = writeLongs(buffer, valuesOffset, value, count, currentSizeOf);
+        valuesSinceBitmask += count;
+        currentSegmentValues += count;
+        count = 0;
+      } else {
+        finishCurrentSegment();
+      }
+    }
+    if (count == 0) return;
+    if (currentSegmentStart == -1) {
+      // We have no segment. For small count, starting a bitmask might make sense, but for now
+      // we always start repeated segment, even if it doesn't make any sense.
+      currentSizeOf = ChunkUtils.TYPE_SIZES[Type.LONG.value()];
+      startRepeatingSegment();
+      currentSegmentHasNulls = false;
+      currentRepeatingLongValue = value;
+      colBuffer.buffer.getContents().putLong(valuesOffset, value);
+      valuesOffset += currentSizeOf;
+    }
+    assert currentSegmentIsRepeating && (currentRepeatingLongValue == value);
+    currentSegmentValues += count;
+  }
+
+  @Override
+  public void finishCurrentSegment() {
+    if (currentSegmentStart == -1) return;
+    ByteBuffer buffer = colBuffer.buffer.getContents();
+    RleSegmentType segmentType = RleSegmentType.INVALID;
+    if (currentSegmentIsRepeating || !currentSegmentHasNulls || valuesSinceBitmask == 0
+        || valuesSinceBitmask == currentBitmaskLimit) {
+      // Simple case - just write the type and count into current segment header.
+      segmentType = currentSegmentIsRepeating ? (currentSegmentHasNulls
+              ? RleSegmentType.REPEATING_NULL : RleSegmentType.REPEATING_VALUE)
+          : (currentSegmentHasNulls
+              ? RleSegmentType.UNIQUE_NULL_BITMASK : RleSegmentType.UNIQUE_NOT_NULL);
+    } else {
+      // Complicated case - bitmask is not finished, we may need to move values.
+      segmentType = RleSegmentType.UNIQUE_NULL_BITMASK;
+      int adjustedValues = ChunkUtils.align64(valuesSinceBitmask); // Rounded to 8 bytes.
+      int bytesShift = (ChunkUtils.align64(currentBitmaskLimit) - adjustedValues) >>> 3;
+      // Will never happen when bitmask is 8 bytes - minimum and maximum sizes are the same.
+      if (bytesShift > 0) {
+        if (DebugUtils.isTraceEnabled()) {
+          Llap.LOG.info("Adjusting last bitmask by " + bytesShift + " bytes");
+        }
+        assert currentSizeOf > 0;
+        int valuesSize = valuesSinceBitmask * currentSizeOf;
+        int valuesStart = valuesOffset - valuesSize;
+        assert buffer.hasArray();
+        byte[] arr = buffer.array();
+        System.arraycopy(arr, valuesStart, arr, valuesStart - bytesShift, valuesSize);
+        valuesOffset -= bytesShift;
+      }
+    }
+    if (DebugUtils.isTraceEnabled()) {
+      Llap.LOG.info("Writing " + segmentType + " header w/ " + currentSegmentValues
+          + " values at " + currentSegmentStart + " till " + valuesOffset);
+    }
+    writeSegmentHeader(buffer, currentSegmentStart, segmentType, currentSegmentValues);
+    colBuffer.update(valuesOffset, currentSegmentValues);
+    currentSegmentStart = -1;
+  }
+
+  private void startRepeatingSegment() {
+    currentSegmentIsRepeating = true;
+    currentSegmentStart = valuesOffset;
+    currentSegmentValues = 0;
+    currentBitmaskOffset = -1;
+    valuesOffset += 8;
+  }
+
+  private void ensureUniqueValueSegment(ByteBuffer buffer, int valueCount, NullsState nullsState) {
+    boolean forceNoNulls = false;
+    if (currentSegmentStart != -1) {
+      if (!currentSegmentIsRepeating) {
+        if (!currentSegmentHasNulls
+            || canValuesFitIntoCurrentSegment(buffer, valueCount, currentSizeOf)) {
+          return; // We have an unique-value segment w/o bitmasks, or values fit w/bitmasks.
+        }
+        forceNoNulls = true;
+      }
+      finishCurrentSegment();
+    }
+    // There no unique-value segment (or we just finished one), start one.
+    currentSegmentStart = valuesOffset;
+    valuesOffset += 8;
+    currentSegmentIsRepeating = false;
+    valuesSinceBitmask = currentSegmentValues = 0;
+    currentSegmentHasNulls = !forceNoNulls && shouldNewSegmentHaveBitmasks(
+        valueCount, nullsState, buffer, valuesOffset, currentSizeOf);
+    if (!currentSegmentHasNulls) {
+      currentBitmaskOffset = -1;
+    } else {
+      startNextBitmask(buffer);
+    }
+  }
+
+  private void startNextBitmask(ByteBuffer buffer) {
+    currentBitmaskOffset = valuesOffset;
+    int spaceLeft = buffer.limit() - currentBitmaskOffset;
+    valuesSinceBitmask = 0;
+    valuesOffset = currentBitmaskOffset;
+    if (spaceLeft >= ChunkUtils.getFullBitmaskSize(currentSizeOf)) {
+      // Most of the time, standard-sized bitmask will fit (we are not at the end of the buffer).
+      currentBitmaskLimit = ChunkUtils.BITMASK_SIZE_BITS;
+      valuesOffset += ChunkUtils.BITMASK_SIZE_BYTES;
+      return;
+    }
+    // Only part of the bitmask will fit, so we will have a smaller one.
+    int incrementSize = 8 + (currentSizeOf << 6); // 8 bytes, 64 values (minimum bitmask alignment)
+    int incrementsThatFit = (spaceLeft / incrementSize);
+    // Per each part, we will add 8 bytes to have space for bitmask, and space for 64 values.
+    valuesOffset += (incrementsThatFit << 3);
+    currentBitmaskLimit = incrementsThatFit >>> 6;
+    // If there's more space, try to fit 8 more bytes of bitmask with less than 64 values.
+    spaceLeft = (spaceLeft % incrementSize) - 8; // 8 bytes for that last bitmask
+    if (spaceLeft >= currentSizeOf) {
+      valuesOffset += 8;
+      currentBitmaskLimit += (spaceLeft / currentSizeOf);
+    }
+    if (currentBitmaskLimit == 0) {
+      throw new AssertionError("Bitmask won't fit; caller should have checked that");
+    }
+  }
+
+  private static void writeZeroesIntoBytes(
+      ByteBuffer buffer, int bitmaskOffset, int valuesSinceBitmask, int valuesToWrite) {
+    // No need to write 0s into the tail of a partial byte - already set to 0s by
+    // the previous call to writeZeroesIntoBytes or writeOnesIntoBytes.
+    int bitsToSkip = 8 - (valuesSinceBitmask & 7);
+    if (bitsToSkip < 8) {
+      valuesToWrite -= bitsToSkip;
+      valuesSinceBitmask += bitsToSkip;
+      if (valuesToWrite <= 0) return;
+    }
+    int nextByteToModify = bitmaskOffset + (valuesSinceBitmask >>> 3);
+    while (valuesToWrite > 0) {
+      buffer.put(nextByteToModify, (byte)0);
+      valuesToWrite -= 8;
+      ++nextByteToModify;
+    }
+  }
+
+  private static void writeOnesIntoBytes(
+      ByteBuffer buffer, int bitmaskOffset, int valuesSinceBitmask, int valuesToWrite) {
+    // We need to set the bits in the bitmask to one. We may have to do partial bits,
+    // then whole bytes, then partial bits again for the last rows. I hate bits.
+    int nextByteToModify = bitmaskOffset + (valuesSinceBitmask >>> 3);
+    int bitsWritten = writeOneBitsFromPartialByte(
+        buffer, nextByteToModify, valuesSinceBitmask, valuesToWrite);
+    if (bitsWritten > 0) {
+      valuesToWrite -= bitsWritten;
+      ++nextByteToModify;
+    }
+    while (valuesToWrite > 8) {
+      buffer.put(nextByteToModify, (byte)0xff);
+      valuesToWrite -= 8;
+      ++nextByteToModify;
+    }
+    if (valuesToWrite > 0) {
+      int newBitsMask = ((1 << valuesToWrite) - 1) << (8 - valuesToWrite);
+      buffer.put(nextByteToModify, (byte)newBitsMask);
+    }
+  }
+
+  private static int writeOneBitsFromPartialByte(
+      ByteBuffer buffer, int bufferOffset, int valuesInBitmask, int valuesToWrite) {
+    int bitOffset = valuesInBitmask & 7;
+    if (bitOffset == 0) return 0;
+    assert bitOffset < 8;
+    byte partialByte = buffer.get(bufferOffset);
+    int unusedBits = 8 - bitOffset;
+    int result = Math.min(unusedBits, valuesToWrite);
+    // Make newBitCount ones, then shift them to create 0s on the right.
+    int newBitsMask = ((1 << result) - 1) << (unusedBits - result);
+    byte newByte = (byte)(partialByte | newBitsMask);
+    buffer.put(bufferOffset, newByte);
+    return result;
+  }
+
+  private static boolean shouldNewSegmentHaveBitmasks(
+      int valueCount, NullsState nullsState, ByteBuffer buffer, int offset, int sizeOf) {
+    // This is rather arbitrary. We'll write some values w/o bitmask if there are enough.
+    // What is enough is an interesting question. We pay 8 bytes extra for segment header
+    // if this is immediately followed by some nulls; so use this as a guideline. If we
+    // don't know if this is followed by null or more values, use half?
+    if (nullsState == NullsState.NO_NULLS || valueCount >= ChunkUtils.BITMASK_SIZE_BITS) {
+      return false;
+    }
+    return canValuesFitWithBitmasks(buffer, offset, valueCount, sizeOf);
+  }
+
+  private boolean canValuesFitIntoCurrentSegment(ByteBuffer buffer, int valueCount, int sizeOf) {
+    int valuesIntoCurrentBitmask = (currentBitmaskLimit - valuesSinceBitmask);
+    valueCount -= valuesIntoCurrentBitmask;
+    if (valueCount <= 0) return true;
+    int nextBitmaskOffset = valuesOffset + valuesIntoCurrentBitmask * sizeOf;
+    return canValuesFitWithBitmasks(buffer, nextBitmaskOffset, valueCount, sizeOf);
+  }
+
+  private static boolean canValuesFitWithBitmasks(
+      ByteBuffer buffer, int offset, int valueCount, int elementSize) {
+    return (determineSizeWithBitMask(valueCount, elementSize) < (buffer.limit() - offset));
+  }
+
+  private static int determineSizeWithBitMask(int count, int elementSize) {
+    return count * elementSize + (ChunkUtils.align64(count) >>> 3);
+  }
+
+  private static void writeSegmentHeader(
+      ByteBuffer buffer, int offset, RleSegmentType type, int rowCount) {
+    buffer.put(offset++, type.getValue());
+    buffer.putInt(offset, rowCount);
+  }
+
+  private static int writeLongs(
+      ByteBuffer buffer, int offset, long[] cv, int cvOffset, int rowsToWrite, int sizeOf) {
+    assert sizeOf > 0;
+    for (int i = 0; i < rowsToWrite; ++i) {
+      buffer.putLong(offset, cv[cvOffset + i]);
+      offset += sizeOf;
+    }
+    return offset;
+  }
+
+  private static int writeDoubles(
+      ByteBuffer buffer, int offset, double[] cv, int cvOffset, int rowsToWrite, int sizeOf) {
+    assert sizeOf > 0;
+    for (int i = 0; i < rowsToWrite; ++i) {
+      buffer.putDouble(offset, cv[cvOffset + i]);
+      offset += sizeOf;
+    }
+    return offset;
+  }
+
+  private static int writeLongs(
+      ByteBuffer buffer, int offset, byte[] cv, int cvOffset, int rowsToWrite, int sizeOf) {
+    assert sizeOf > 0;
+    for (int i = 0; i < rowsToWrite; ++i) {
+      buffer.putLong(offset, cv[cvOffset + i]);
+      offset += sizeOf;
+    }
+    return offset;
+  }
+
+  private static int writeLongs(
+      ByteBuffer buffer, int offset, long value, int rowsToWrite, int sizeOf) {
+    for (int i = 0; i < rowsToWrite; ++i) {
+      buffer.putLong(offset, value);
+      offset += sizeOf;
+    }
+    return offset;
+  }
+}

Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/BufferInProgress.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/BufferInProgress.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/BufferInProgress.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/BufferInProgress.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.loader;
+
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+
+/**
+ * Helper struct that is used by loaders (e.g. OrcLoader) and chunk writer to write chunks.
+ */
+public class BufferInProgress {
+  /** Buffer that is being written to. */
+  public final WeakBuffer buffer;
+  /** Offset in buffer where writing can proceed */
+  public int offset; // TODO: use WB's position; these have separate lifecycle now, needed?
+  private final int bufferLimit;
+
+  /** The chunk that is currently being written. */
+  private Chunk chunkInProgress = null;
+  /** The row count of the chunk currently being written. */
+  private int chunkInProgressRows = 0;
+
+  public BufferInProgress(WeakBuffer buffer) {
+    this.buffer = buffer;
+    this.bufferLimit = buffer.getContents().limit();
+    this.offset = 0;
+  }
+
+  public Chunk ensureChunk() {
+    if (chunkInProgress == null) {
+      chunkInProgress = new Chunk(buffer, offset, 0);
+      chunkInProgressRows = 0;
+    }
+    return chunkInProgress;
+  }
+
+  public Chunk extractChunk() {
+    Chunk result = chunkInProgress;
+    chunkInProgress = null;
+    chunkInProgressRows = 0;
+    return result;
+  }
+
+  public void update(int newOffset, int rowsWritten) {
+    if (newOffset > bufferLimit) {
+      throw new AssertionError("Offset is beyond buffer limit: " + newOffset + "/" + bufferLimit
+         + "; previous offset " + offset + ", chunk " + chunkInProgress);
+    }
+    chunkInProgress.length += (newOffset - offset);
+    this.offset = newOffset;
+    this.chunkInProgressRows += rowsWritten;
+  }
+
+  public int getChunkInProgressRows() {
+    return chunkInProgressRows;
+  }
+
+  public int getSpaceLeft() {
+    return getSpaceLeft(-1);
+  }
+
+  public int getSpaceLeft(int offset) {
+    offset = (offset >= 0) ? offset : this.offset;
+    return buffer.getContents().limit() - offset;
+  }
+}

Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/ChunkPool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/ChunkPool.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/ChunkPool.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/ChunkPool.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.loader;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.cache.EvictionListener;
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+
+/**
+ * This class contains the mapping of file chunks to buffers inside BufferPool.
+ */
+public class ChunkPool<K> implements EvictionListener {
+  private final ConcurrentHashMap<K, Chunk> chunkCache = new ConcurrentHashMap<K, Chunk>();
+
+  /** Number of unprocessed evictions, for the background thread. */
+  private final AtomicInteger newEvictions = new AtomicInteger(0);
+  private final Thread cleanupThread;
+
+  public ChunkPool() {
+    cleanupThread = new CleanupThread();
+    cleanupThread.start();
+  }
+
+  /**
+   * Gets a chunk from cache
+   * TODO:  We expect that in most cases, some related chunks (e.g. columns for a stripe)
+   *        will be stored in the same buffer. We could use this to get keys more efficiently.
+   *        On the other hand, real stripes are pretty big.
+   * @param key key to search for.
+   * @return Chunk corresponding to k.
+   */
+  public Chunk getChunk(K key, HashSet<WeakBuffer> lockedBuffers) {
+    Chunk result = chunkCache.get(key);
+    if (result == null) {
+      return null;
+    }
+    while (true) {
+      if (lockChunk(result, lockedBuffers)) return result;
+      if (chunkCache.remove(key, result)) return null;
+    }
+  }
+
+  private boolean lockChunk(Chunk result, HashSet<WeakBuffer> lockedBuffers) {
+    // We expect the chain to have 1 or 2 buffers (2 if we are on buffer boundary). Keep track of
+    // what we lock in the bitmask; may need fixing (extremely unlikely - 64+ buffer, giant chunks)
+    boolean failedToLock = false;
+    long blocksToUnlock = 0;
+    long bit = 1 << 63; // The bit indicating that current chunk was locked.
+
+    Chunk chunk = result;
+    while (chunk != null) {
+      if (lockedBuffers.contains(chunk.buffer)) {
+        assert chunk.buffer.isLocked() : chunk.buffer + " is in lockedBuffers but is not locked";
+      } else if (chunk.buffer.lock(true)) {
+        if (DebugUtils.isTraceLockingEnabled()) {
+          Llap.LOG.info("Locked " + chunk.buffer + " for " + result);
+        }
+        lockedBuffers.add(chunk.buffer);
+        blocksToUnlock += bit;
+      } else {
+        failedToLock = true;
+        break;
+      }
+      bit >>>= 1;
+      chunk = chunk.nextChunk;
+      if (bit == 1 && chunk != null) {
+        throw new AssertionError("Chunk chain was too long");
+      }
+    }
+    if (!failedToLock) return true;
+
+    bit = 1 << 63;
+    Chunk chunk2 = result;
+    while (chunk2 != chunk) {
+      if ((blocksToUnlock & bit) == bit) {
+        if (DebugUtils.isTraceLockingEnabled()) {
+          Llap.LOG.info("Unlocking " + chunk2.buffer + " due to failed chunk lock");
+        }
+        lockedBuffers.remove(chunk2.buffer);
+        chunk2.buffer.unlock();
+      }
+      bit >>>= 1;
+      chunk2 = chunk2.nextChunk;
+    }
+    return false;
+  }
+
+  private boolean verifyChunk(Chunk entry) {
+    Chunk chunk = entry;
+    while (chunk != null) {
+      if (!chunk.buffer.lock(false)) break;
+      chunk = chunk.nextChunk;
+    }
+    Chunk chunk2 = entry;
+    while (chunk2 != chunk) {
+      chunk2.buffer.unlock();
+      chunk2 = chunk2.nextChunk;
+    }
+    return chunk == null;
+  }
+
+  public Chunk addOrGetChunk(K key, Chunk val, HashSet<WeakBuffer> lockedBuffers) {
+    assert val.buffer.isLocked();
+    while (true) {
+      Chunk oldVal = chunkCache.putIfAbsent(key, val);
+      if (oldVal == null) return val;
+      if (DebugUtils.isTraceCachingEnabled()) {
+        Llap.LOG.info("Trying to cache when the chunk is already cached for "
+            + key + "; old " + oldVal + ", new " + val);
+      }
+      if (lockChunk(oldVal, lockedBuffers)) return oldVal;
+      // We found some old value but couldn't lock it; remove it.
+      chunkCache.remove(key, oldVal);
+    }
+  }
+
+  @Override
+  public void evictionNotice(WeakBuffer evicted) {
+    int oldValue = newEvictions.getAndIncrement();
+    if (oldValue == 0) {
+      synchronized (newEvictions) {
+        newEvictions.notifyAll();
+      }
+    }
+  }
+
+  public static class Chunk {
+    public WeakBuffer buffer;
+    public int offset, length;
+    public Chunk nextChunk;
+
+    public Chunk(WeakBuffer buffer, int offset, int length) {
+      this.buffer = buffer;
+      this.offset = offset;
+      this.length = length;
+    }
+
+    public Chunk addChunk(Chunk another) {
+      // Traversing list is bad; however, we expect that this will very rarely happen; and in
+      // nearly all the cases when it does (buffer boundary) the list will have 1 element.
+      Chunk chunk = this;
+      while (chunk.nextChunk != null) {
+        chunk = chunk.nextChunk;
+      }
+      chunk.nextChunk = another;
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return "{" + buffer + ", " + offset + ", " + length + "}";
+    }
+
+    public String toFullString() {
+      String result = "";
+      Chunk chunk = this;
+      while (chunk != null) {
+        result += chunk.toString() + ", ";
+        chunk = chunk.nextChunk;
+      }
+      return result;
+    }
+  }
+
+  private final class CleanupThread extends Thread {
+    private int APPROX_CLEANUP_INTERVAL_SEC = 600;
+
+    public CleanupThread() {
+      super("Llap ChunkPool cleanup thread");
+      setDaemon(true);
+      setPriority(1);
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          doOneCleanupRound();
+        } catch (InterruptedException ex) {
+          Llap.LOG.warn("Cleanup thread has been interrupted");
+          Thread.currentThread().interrupt();
+          break;
+        } catch (Throwable t) {
+          Llap.LOG.error("Cleanup has failed; the thread will now exit", t);
+          break;
+        }
+      }
+    }
+
+    private void doOneCleanupRound() throws InterruptedException {
+      while (true) {
+        int evictionsSinceLast = newEvictions.getAndSet(0);
+        if (evictionsSinceLast > 0) break;
+        synchronized (newEvictions) {
+          newEvictions.wait(10000);
+        }
+      }
+      // Duration is an estimate; if the size of the map changes rapidly, it can be very different.
+      long endTime = System.nanoTime() + APPROX_CLEANUP_INTERVAL_SEC * 1000000000L;
+      int processed = 0;
+      // TODO: if this iterator affects the map in some bad way,
+      //       we'd need to sleep once per round instead.
+      Iterator<Map.Entry<K, Chunk>> iter = chunkCache.entrySet().iterator();
+      while (iter.hasNext()) {
+        if (!verifyChunk(iter.next().getValue())) {
+          iter.remove();
+        }
+        ++processed;
+        int approxElementsLeft = chunkCache.size() - processed;
+        Thread.sleep((approxElementsLeft <= 0)
+            ? 1 : (endTime - System.nanoTime()) / (1000000L * approxElementsLeft));
+      }
+    }
+  }
+}

Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/Loader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/Loader.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/Loader.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/Loader.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hive.llap.loader;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.api.Vector;
+import org.apache.hadoop.hive.llap.api.impl.RequestImpl;
+import org.apache.hadoop.hive.llap.api.impl.VectorImpl;
+import org.apache.hadoop.hive.llap.cache.BufferPool;
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriterImpl;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+import org.apache.hadoop.hive.llap.processor.ChunkConsumer;
+import org.apache.hadoop.hive.llap.processor.ChunkProducerFeedback;
+
+// TODO: write unit tests if this class becomes less primitive.
+public abstract class Loader {
+  // For now, we have one buffer pool. Add bufferpool per load when needed.
+  private final BufferPool bufferPool;
+  private final ConcurrentLinkedQueue<BufferInProgress> reusableBuffers =
+      new ConcurrentLinkedQueue<BufferInProgress>();
+  protected final ChunkWriterImpl writer;
+
+
+  public Loader(BufferPool bufferPool) {
+    this.bufferPool = bufferPool;
+    this.writer = new ChunkWriterImpl();
+  }
+
+  protected class LoadContext implements ChunkProducerFeedback {
+    public volatile boolean isStopped = false;
+
+    @Override
+    public void returnCompleteVector(Vector vector) {
+      Loader.this.returnCompleteVector(vector);
+    }
+
+    @Override
+    public void stop() {
+      isStopped = true;
+    }
+  }
+
+  public final void load(RequestImpl request, ChunkConsumer consumer)
+      throws IOException, InterruptedException {
+    // TODO: this API is subject to change, just a placeholder. Ideally we want to refactor
+    //       so that working with cache and buffer allocation/locking would be here, but right
+    //       now it depends on OrcLoader (esp. locking is hard to pull out).
+    LoadContext context = new LoadContext();
+    consumer.init(context); // passed as ChunkProducerFeedback
+    loadInternal(request, consumer, context);
+  }
+
+  private void returnCompleteVector(Vector vector) {
+    VectorImpl vectorImpl = (VectorImpl)vector;
+    for (BufferPool.WeakBuffer buffer : vectorImpl.getCacheBuffers()) {
+      if (DebugUtils.isTraceLockingEnabled()) {
+        Llap.LOG.info("Unlocking " + buffer + " because reader is done");
+      }
+      buffer.unlock();
+    }
+  }
+
+  // TODO: this API is subject to change, just a placeholder.
+  protected abstract void loadInternal(RequestImpl request, ChunkConsumer consumer,
+      LoadContext context) throws IOException, InterruptedException;
+
+  protected final BufferInProgress prepareReusableBuffer(
+      HashSet<WeakBuffer> resultBuffers) throws InterruptedException {
+    while (true) {
+      BufferInProgress buf = reusableBuffers.poll();
+      if (buf == null) {
+        WeakBuffer newWb = bufferPool.allocateBuffer();
+        if (!resultBuffers.add(newWb)) {
+          throw new AssertionError("Cannot add new buffer to resultBuffers");
+        }
+        return new BufferInProgress(newWb);
+      }
+      if (resultBuffers.add(buf.buffer)) {
+        if (!buf.buffer.lock(true)) {
+          resultBuffers.remove(buf.buffer);
+          continue;  // Buffer was evicted.
+        }
+        if (DebugUtils.isTraceLockingEnabled()) {
+          Llap.LOG.info("Locked " + buf.buffer + " due to reuse");
+        }
+      } else if (!buf.buffer.isLocked()) {
+        throw new AssertionError(buf.buffer + " is in resultBuffers, but is not locked");
+      }
+    }
+  }
+
+  protected final void returnReusableBuffer(BufferInProgress colBuffer) {
+    // Check space - 16 is chunk header plus one segment header, minimum required space.
+    // This should be extremely rare.
+    // TODO: use different value that makes some sense
+    // TODO: with large enough stripes it might be better not to split every stripe into two
+    //       buffers but instead not reuse the buffer if e.g. 1Mb/15Mb is left.
+    if (colBuffer.getSpaceLeft() < 16) return;
+    reusableBuffers.add(colBuffer);
+  }
+
+  protected Chunk mergeResultChunks(BufferInProgress colBuffer,
+      Chunk existingResult, boolean finalCheck) throws IOException {
+    // Both should be extracted in one method, but it's too painful to do in Java.
+    int rowCount = colBuffer.getChunkInProgressRows();
+    Chunk chunk = colBuffer.extractChunk();
+    if (rowCount <= 0) {
+      if (finalCheck && existingResult == null) {
+        throw new IOException("No rows written for column");
+      }
+      return existingResult;
+    }
+    writer.finishChunk(chunk, rowCount);
+    return (existingResult == null) ? chunk : existingResult.addChunk(chunk);
+  }
+}

Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/OrcLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/OrcLoader.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/OrcLoader.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/loader/OrcLoader.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.loader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.api.Llap;
+import org.apache.hadoop.hive.llap.api.Vector;
+import org.apache.hadoop.hive.llap.api.impl.RequestImpl;
+import org.apache.hadoop.hive.llap.api.impl.VectorImpl;
+import org.apache.hadoop.hive.llap.cache.BufferPool;
+import org.apache.hadoop.hive.llap.cache.BufferPool.WeakBuffer;
+import org.apache.hadoop.hive.llap.cache.MetadataCache;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriterImpl;
+import org.apache.hadoop.hive.llap.loader.ChunkPool.Chunk;
+import org.apache.hadoop.hive.llap.processor.ChunkConsumer;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type.Kind;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+
+public class OrcLoader extends Loader {
+  private final ChunkPool<ChunkKey> chunkPool;
+  private FileSystem cachedFs = null;
+  private final MetadataCache metadataCache = new MetadataCache();
+
+  public OrcLoader(BufferPool bufferPool, ChunkPool<ChunkKey> chunkPool,
+      Configuration conf) throws IOException {
+    super(bufferPool);
+    this.chunkPool = chunkPool;
+    // We assume all splits will come from the same FS.
+    this.cachedFs = FileSystem.get(conf);
+  }
+
+  @Override
+  protected void loadInternal(RequestImpl request, ChunkConsumer consumer, LoadContext context)
+      throws IOException, InterruptedException {
+    // TODO: decide on - LocalActorSystem.INSTANCE.enqueue(request, bufferPool, consumer);
+    if (DebugUtils.isTraceMttEnabled()) {
+      Llap.LOG.info("loadInternal called");
+    }
+    List<Integer> includedCols = request.getColumns();
+    if (includedCols != null) {
+      Collections.sort(includedCols);
+    }
+    FileSplit fileSplit = (FileSplit)request.getSplit();
+    String internedFilePath = fileSplit.getPath().toString().intern();
+    Llap.LOG.info("Processing split for " + internedFilePath);
+    if (context.isStopped) return;
+    List<StripeInformation> stripes = metadataCache.getStripes(internedFilePath);
+    List<Type> types = metadataCache.getTypes(internedFilePath);
+    Reader reader = null;
+    if (stripes == null || types == null) {
+      reader = createReader(fileSplit);
+      if (stripes == null) {
+        stripes = reader.getStripes();
+        metadataCache.cacheStripes(internedFilePath, stripes);
+      }
+      if (types == null) {
+        types = reader.getTypes();
+        metadataCache.cacheTypes(internedFilePath, types);
+      }
+    }
+
+    // Determine which stripes belong to this split and make keys to get chunks from cache.
+    // This assumes all splits will have the same columns.
+    if (includedCols == null) {
+      includedCols = new ArrayList<Integer>(types.size());
+      for (int i = 1; i < types.size(); ++i) {
+        includedCols.add(i);
+      }
+    }
+    List<List<ChunkKey>> keys = new ArrayList<List<ChunkKey>>();
+    long stripeIxFromAndTo = determineStripesAndCacheKeys(
+        fileSplit, includedCols, internedFilePath, stripes, keys);
+    int stripeIxFrom = (int)(stripeIxFromAndTo >>> 32),
+        stripeIxTo = (int)(stripeIxFromAndTo & (long)Integer.MAX_VALUE);
+
+    // Prepare structures for tracking the results.
+    int resultVectorCount = stripeIxTo - stripeIxFrom;
+    Chunk[][] resultMatrix = new Chunk[resultVectorCount][];
+    @SuppressWarnings("unchecked")
+    // TODO: we store result buffers uniquely in a set, so we could lock/unlock them once. This may
+    //       be more expensive than just making locking faster, and locking-unlocking as needed.
+    HashSet<WeakBuffer>[] resultBuffers = new HashSet[resultVectorCount];
+    for (int i = 0; i < resultVectorCount; ++i) {
+      resultMatrix[i] = new Chunk[types.size()];
+      resultBuffers[i] = new HashSet<WeakBuffer>();
+    }
+    if (context.isStopped) return;
+    // TODO: after this moment, we must be careful when checking isStopped to avoid
+    //       leaving some chunks locked and un-consumed. For now we just never check.
+
+    // For now we will fetch missing results by stripe - this is how reader needs them.
+    List<Integer> readyStripes = getChunksFromCache(
+        keys, types.size(), stripeIxFrom, resultBuffers, resultMatrix);
+    if (readyStripes != null) {
+      Llap.LOG.info("Got " + readyStripes.size() + " full stripes from cache");
+      for (Integer stripeIx : readyStripes) {
+        int stripeIxMod = stripeIx - stripeIxFrom;
+        VectorImpl vector = createVectorForStripe(
+            resultMatrix[stripeIxMod], resultBuffers[stripeIxMod], types, includedCols);
+        if (DebugUtils.isTraceMttEnabled()) {
+          Llap.LOG.info("Returning stripe " + stripeIx + " from cache");
+        }
+        consumer.consumeVector(vector);
+        resultMatrix[stripeIxMod] = null;
+        resultBuffers[stripeIxMod] = null;
+      }
+    }
+
+    // Now we have a set of keys for all the things that are missing. Fetch them...
+    // TODO: this should happen on some sort of IO thread pool.
+    for (List<ChunkKey> stripeKeys : keys) {
+      if (stripeKeys.isEmpty()) continue;
+      int stripeIx = stripeKeys.get(0).stripeIx;
+      StripeInformation si = stripes.get(stripeIx);
+      List<Integer> includeList = null;
+      if (includedCols.size() == stripeKeys.size()) {
+        includeList = includedCols;
+      } else {
+        includeList = new ArrayList<Integer>(stripeKeys.size());
+        for (ChunkKey key : stripeKeys) {
+          includeList.add(key.colIx);
+        }
+      }
+      boolean[] includes = OrcInputFormat.genIncludedColumns(types, includeList, true);
+      if (Llap.LOG.isDebugEnabled()) {
+        Llap.LOG.debug("Reading stripe " + stripeIx + " {"
+          + si.getOffset() + ", " + si.getLength() + "}, cols " + Arrays.toString(includes));
+      }
+
+      if (reader == null) {
+        reader = createReader(fileSplit);
+      }
+
+      RecordReader stripeReader = reader.rows(si.getOffset(), si.getLength(), includes);
+      int stripeIxMod = stripeIx - stripeIxFrom;
+      Chunk[] result = resultMatrix[stripeIxMod];
+      HashSet<WeakBuffer> buffers = resultBuffers[stripeIxMod];
+
+      loadStripe(stripeReader, stripeKeys, result, buffers);
+      stripeReader.close();
+      VectorImpl vector = createVectorForStripe(result, buffers, types, includedCols);
+      if (DebugUtils.isTraceMttEnabled()) {
+        Llap.LOG.info("Returning stripe " + stripeIx + " from FS");
+      }
+      consumer.consumeVector(vector);
+    }
+    consumer.setDone();
+    if (DebugUtils.isTraceMttEnabled()) {
+      Llap.LOG.info("loadInternal is done");
+    }
+  }
+
+  /**
+   * Determines which stripe range belongs to a split, and generates cache keys
+   *  for all these stripes and all the included columns.
+   * @param fileSplit The split.
+   * @param includedCols Included columns.
+   * @param internedFilePath Interned file path from the split, for cache keys.
+   * @param stripes Stripe information from the reader.
+   * @param keys The keys for cache lookups are inserted here.
+   * @return Combined int-s for stripe from (inc.) and to (exc.) indexes, because Java is a joke
+   */
+  private long determineStripesAndCacheKeys(FileSplit fileSplit, List<Integer> includedCols,
+      String internedFilePath, List<StripeInformation> stripes, List<List<ChunkKey>> keys) {
+    // The unit of caching for ORC is (stripe x column) (see ChunkKey). Note that we do not use
+    // SARG anywhere, because file-level filtering on sarg is already performed during split
+    // generation, and stripe-level filtering to get row groups is not very helpful right now.
+    long offset = fileSplit.getStart(), maxOffset = offset + fileSplit.getLength();
+    int stripeIxFrom = -1, stripeIxTo = -1, stripeIx = 0;
+    if (Llap.LOG.isDebugEnabled()) {
+      String tmp = "FileSplit {" + fileSplit.getStart()
+          + ", " + fileSplit.getLength() + "}; stripes ";
+      for (StripeInformation stripe : stripes) {
+        tmp += "{" + stripe.getOffset() + ", " + stripe.getLength() + "}, ";
+      }
+      Llap.LOG.debug(tmp);
+    }
+
+    for (StripeInformation stripe : stripes) {
+      long stripeStart = stripe.getOffset();
+      if (offset > stripeStart) continue;
+      if (stripeIxFrom == -1) {
+        if (DebugUtils.isTraceEnabled()) {
+          Llap.LOG.info("Including from " + stripeIx
+              + " (" + stripeStart + " >= " + offset + ")");
+        }
+        stripeIxFrom = stripeIx;
+      }
+      if (stripeStart >= maxOffset) {
+        if (DebugUtils.isTraceEnabled()) {
+          Llap.LOG.info("Including until " + stripeIxTo
+              + " (" + stripeStart + " >= " + maxOffset + ")");
+        }
+        stripeIxTo = stripeIx;
+        break;
+      }
+
+      ArrayList<ChunkKey> stripeKeys = new ArrayList<ChunkKey>(includedCols.size());
+      keys.add(stripeKeys);
+      for (Integer colIx : includedCols) {
+        stripeKeys.add(new ChunkKey(internedFilePath, stripeIx, colIx));
+      }
+      ++stripeIx;
+    }
+    if (stripeIxTo == -1) {
+      if (DebugUtils.isTraceEnabled()) {
+        Llap.LOG.info("Including until " + stripeIx + " (end of file)");
+      }
+      stripeIxTo = stripeIx;
+    }
+    return (((long)stripeIxFrom) << 32) + stripeIxTo;
+  }
+
+  /**
+   * Gets chunks from cache and generates include arrays for things to be fetched.
+   * @param keys Keys to get.
+   * @param colCount Column count in the file.
+   * @param stripeIxFrom Stripe index start in the split.
+   * @param resultBuffers Resulting buffers are added here.
+   * @param resultMatrix Results that are fetched from cache are added here.
+   * @return Matrix of things are not cache.
+   */
+  private List<Integer> getChunksFromCache(List<List<ChunkKey>> keys, int colCount,
+      int stripeIxFrom, HashSet<WeakBuffer>[] resultBuffers, Chunk[][] resultMatrix) {
+    List<Integer> readyStripes = null;
+    for (List<ChunkKey> stripeKeys : keys) {
+      int stripeIx = stripeKeys.get(0).stripeIx;
+      int stripeIxMod = stripeIx - stripeIxFrom;
+      Chunk[] chunksForStripe = resultMatrix[stripeIxMod];
+      HashSet<WeakBuffer> buffersForStripe = resultBuffers[stripeIxMod];
+      Iterator<ChunkKey> iter = stripeKeys.iterator();
+      while (iter.hasNext()) {
+        ChunkKey key = iter.next();
+        Chunk result = chunkPool.getChunk(key, buffersForStripe);
+        if (result == null) continue;
+        if (Llap.LOG.isDebugEnabled()) {
+          Llap.LOG.debug("Found result in cache for " + key + ": " + result.toFullString());
+        }
+        chunksForStripe[key.colIx] = result;
+        iter.remove();
+      }
+      if (stripeKeys.isEmpty()) {
+        if (readyStripes == null) {
+          readyStripes = new ArrayList<Integer>();
+        }
+        readyStripes.add(stripeIx);
+      }
+    }
+    return readyStripes;
+  }
+
+  private Reader createReader(FileSplit fileSplit) throws IOException {
+    FileSystem fs = cachedFs;
+    Path path = fileSplit.getPath();
+    Configuration conf = new Configuration();
+    if ("pfile".equals(path.toUri().getScheme())) {
+      fs = path.getFileSystem(conf); // Cannot use cached FS due to hive tests' proxy FS.
+    }
+    return OrcFile.createReader(path, OrcFile.readerOptions(conf).filesystem(fs));
+  }
+
+  private void loadStripe(RecordReader reader, List<ChunkKey> keys, Chunk[] results,
+      HashSet<WeakBuffer> resultBuffers) throws IOException, InterruptedException {
+    // Reader is reading a single stripe; read the entirety of each column.
+    Object readCtx = reader.prepareColumnRead();
+    for (int keyIx = 0; keyIx < keys.size(); ++keyIx) {
+      ChunkKey key = keys.get(keyIx);
+      BufferInProgress colBuffer = null;
+      while (true) {
+        colBuffer = prepareReusableBuffer(resultBuffers);
+        writer.prepare(colBuffer);
+        boolean hasMoreValues = reader.readNextColumnStripe(readCtx, writer);
+        if (!hasMoreValues) break;
+        if (DebugUtils.isTraceEnabled()) {
+          Llap.LOG.info("Stripe doesn't fit into buffer");
+        }
+        // We couldn't write all rows to this buffer, so we'll close the chunk.
+        results[key.colIx] = mergeResultChunks(colBuffer, results[key.colIx], false);
+      }
+      // Done with the reader:
+      // 1) add final chunk to result;
+      // 2) add reusable buffer back to list;
+      // 3) add results to cache and resolve conflicts.
+      Chunk val = results[key.colIx] =
+          mergeResultChunks(colBuffer, results[key.colIx], true);
+      if (Llap.LOG.isDebugEnabled()) {
+        Llap.LOG.debug("Caching chunk " + key + " => " + val.toFullString());
+      }
+      Chunk cachedVal = chunkPool.addOrGetChunk(key, val, resultBuffers);
+      if (cachedVal != val) {
+        // Someone else has read and cached the same value while we were reading. Assumed to be
+        // very rare (otherwise we'd need measures to prevent it), so we will not be efficient;
+        // we will rebuild resultBuffers rather than removing buffers from them.
+        results[key.colIx] = cachedVal;
+        resultBuffers.clear();
+        for (int i = 0; i < results.length; ++i) {
+          Chunk chunk1 = results[i];
+          while (chunk1 != null) {
+            resultBuffers.add(chunk1.buffer);
+            chunk1 = chunk1.nextChunk;
+          }
+        }
+        Chunk chunk = cachedVal;
+        while (chunk != null) {
+          if (!resultBuffers.contains(chunk.buffer)) {
+            chunk.buffer.unlock();
+          }
+          chunk = chunk.nextChunk;
+        }
+      }
+      returnReusableBuffer(colBuffer);
+    }
+  }
+
+  private VectorImpl createVectorForStripe(Chunk[] rowForStripe,
+      Collection<WeakBuffer> resultBuffers, List<Type> types, List<Integer> includedCols) {
+    VectorImpl vector = new VectorImpl(resultBuffers, types.size());
+    for (Integer colIx : includedCols) {
+      // TODO: this "+ 1" is a hack relying on knowledge of ORC. It might change, esp. w/ACID.
+      Vector.Type type = vectorTypeFromOrcType(types.get(colIx + 1).getKind());
+      vector.addChunk(colIx, rowForStripe[colIx], type);
+    }
+    return vector;
+  }
+
+  private static Vector.Type vectorTypeFromOrcType(Kind orcType) {
+    switch (orcType) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case DATE:
+      case TIMESTAMP:
+        return Vector.Type.LONG;
+      case FLOAT:
+      case DOUBLE:
+        return Vector.Type.DOUBLE;
+      case STRING:
+        return Vector.Type.BINARY;
+      case DECIMAL:
+        return Vector.Type.DECIMAL;
+      default:
+        throw new UnsupportedOperationException("Unsupported type " + orcType);
+    }
+  }
+
+  public static class ChunkKey {
+    /** @param file This MUST be interned by caller. */
+    private ChunkKey(String file, int stripeIx, int colIx) {
+      this.file = file;
+      this.stripeIx = stripeIx;
+      this.colIx = colIx;
+    }
+    private final String file;
+    private final int stripeIx;
+    private final int colIx;
+
+    @Override
+    public String toString() {
+      return "[" + file + ", stripe " + stripeIx + ", colIx " + colIx + "]";
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = prime + ((file == null) ? 0 : System.identityHashCode(file));
+      return (prime * result + colIx) * prime + stripeIx;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (!(obj instanceof ChunkKey)) return false;
+      ChunkKey other = (ChunkKey)obj;
+      // Strings are interned and can thus be compared like this.
+      return stripeIx == other.stripeIx && colIx == other.colIx && file == other.file;
+    }
+  }
+}

Added: hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkConsumer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkConsumer.java?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkConsumer.java (added)
+++ hive/branches/llap/llap/src/java/org/apache/hadoop/hive/llap/processor/ChunkConsumer.java Tue Sep 16 17:50:02 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.processor;
+
+import org.apache.hadoop.hive.llap.api.Vector;
+
+/**
+ * Interface implemented by reader; allows it to receive blocks asynchronously.
+ */
+public interface ChunkConsumer {
+  public void init(ChunkProducerFeedback feedback);
+  public void setDone();
+  // For now this returns Vector, which has to have full rows.
+  // Vectorization cannot run on non-full rows anyway so that's ok. Maybe later we can
+  // have LazyVRB which only loads columns when needed... one can dream right?
+  public void consumeVector(Vector vector);
+  public void setError(Throwable t);
+}