You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/12 00:28:09 UTC

[12/16] hive git commit: HIVE-11890. Create ORC submodue. (omalley reviewed by prasanthj)

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/InStream.java b/orc/src/java/org/apache/orc/impl/InStream.java
new file mode 100644
index 0000000..b1c6de5
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/InStream.java
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.orc.CompressionCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.common.io.DiskRange;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.CodedInputStream;
+
+public abstract class InStream extends InputStream {
+
+  private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
+  private static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
+
+  protected final String name;
+  protected long length;
+
+  public InStream(String name, long length) {
+    this.name = name;
+    this.length = length;
+  }
+
+  public String getStreamName() {
+    return name;
+  }
+
+  public long getStreamLength() {
+    return length;
+  }
+
+  public static class UncompressedStream extends InStream {
+    private List<DiskRange> bytes;
+    private long length;
+    protected long currentOffset;
+    private ByteBuffer range;
+    private int currentRange;
+
+    public UncompressedStream(String name, List<DiskRange> input, long length) {
+      super(name, length);
+      reset(input, length);
+    }
+
+    protected void reset(List<DiskRange> input, long length) {
+      this.bytes = input;
+      this.length = length;
+      currentRange = 0;
+      currentOffset = 0;
+      range = null;
+    }
+
+    @Override
+    public int read() {
+      if (range == null || range.remaining() == 0) {
+        if (currentOffset == length) {
+          return -1;
+        }
+        seek(currentOffset);
+      }
+      currentOffset += 1;
+      return 0xff & range.get();
+    }
+
+    @Override
+    public int read(byte[] data, int offset, int length) {
+      if (range == null || range.remaining() == 0) {
+        if (currentOffset == this.length) {
+          return -1;
+        }
+        seek(currentOffset);
+      }
+      int actualLength = Math.min(length, range.remaining());
+      range.get(data, offset, actualLength);
+      currentOffset += actualLength;
+      return actualLength;
+    }
+
+    @Override
+    public int available() {
+      if (range != null && range.remaining() > 0) {
+        return range.remaining();
+      }
+      return (int) (length - currentOffset);
+    }
+
+    @Override
+    public void close() {
+      currentRange = bytes.size();
+      currentOffset = length;
+      // explicit de-ref of bytes[]
+      bytes.clear();
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      seek(index.getNext());
+    }
+
+    public void seek(long desired) {
+      if (desired == 0 && bytes.isEmpty()) {
+        logEmptySeek(name);
+        return;
+      }
+      int i = 0;
+      for (DiskRange curRange : bytes) {
+        if (desired == 0 && curRange.getData().remaining() == 0) {
+          logEmptySeek(name);
+          return;
+        }
+        if (curRange.getOffset() <= desired &&
+            (desired - curRange.getOffset()) < curRange.getLength()) {
+          currentOffset = desired;
+          currentRange = i;
+          this.range = curRange.getData().duplicate();
+          int pos = range.position();
+          pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
+          this.range.position(pos);
+          return;
+        }
+        ++i;
+      }
+      // if they are seeking to the precise end, go ahead and let them go there
+      int segments = bytes.size();
+      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+        currentOffset = desired;
+        currentRange = segments - 1;
+        DiskRange curRange = bytes.get(currentRange);
+        this.range = curRange.getData().duplicate();
+        int pos = range.position();
+        pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
+        this.range.position(pos);
+        return;
+      }
+      throw new IllegalArgumentException("Seek in " + name + " to " +
+        desired + " is outside of the data");
+    }
+
+    @Override
+    public String toString() {
+      return "uncompressed stream " + name + " position: " + currentOffset +
+          " length: " + length + " range: " + currentRange +
+          " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
+    }
+  }
+
+  private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
+    // TODO: use the same pool as the ORC readers
+    if (isDirect) {
+      return ByteBuffer.allocateDirect(size);
+    } else {
+      return ByteBuffer.allocate(size);
+    }
+  }
+
+  private static class CompressedStream extends InStream {
+    private final List<DiskRange> bytes;
+    private final int bufferSize;
+    private ByteBuffer uncompressed;
+    private final CompressionCodec codec;
+    private ByteBuffer compressed;
+    private long currentOffset;
+    private int currentRange;
+    private boolean isUncompressedOriginal;
+
+    public CompressedStream(String name, List<DiskRange> input, long length,
+                            CompressionCodec codec, int bufferSize) {
+      super(name, length);
+      this.bytes = input;
+      this.codec = codec;
+      this.bufferSize = bufferSize;
+      currentOffset = 0;
+      currentRange = 0;
+    }
+
+    private void allocateForUncompressed(int size, boolean isDirect) {
+      uncompressed = allocateBuffer(size, isDirect);
+    }
+
+    private void readHeader() throws IOException {
+      if (compressed == null || compressed.remaining() <= 0) {
+        seek(currentOffset);
+      }
+      if (compressed.remaining() > OutStream.HEADER_SIZE) {
+        int b0 = compressed.get() & 0xff;
+        int b1 = compressed.get() & 0xff;
+        int b2 = compressed.get() & 0xff;
+        boolean isOriginal = (b0 & 0x01) == 1;
+        int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+
+        if (chunkLength > bufferSize) {
+          throw new IllegalArgumentException("Buffer size too small. size = " +
+              bufferSize + " needed = " + chunkLength);
+        }
+        // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
+        assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
+        currentOffset += OutStream.HEADER_SIZE;
+
+        ByteBuffer slice = this.slice(chunkLength);
+
+        if (isOriginal) {
+          uncompressed = slice;
+          isUncompressedOriginal = true;
+        } else {
+          if (isUncompressedOriginal) {
+            allocateForUncompressed(bufferSize, slice.isDirect());
+            isUncompressedOriginal = false;
+          } else if (uncompressed == null) {
+            allocateForUncompressed(bufferSize, slice.isDirect());
+          } else {
+            uncompressed.clear();
+          }
+          codec.decompress(slice, uncompressed);
+         }
+      } else {
+        throw new IllegalStateException("Can't read header at " + this);
+      }
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (uncompressed == null || uncompressed.remaining() == 0) {
+        if (currentOffset == length) {
+          return -1;
+        }
+        readHeader();
+      }
+      return 0xff & uncompressed.get();
+    }
+
+    @Override
+    public int read(byte[] data, int offset, int length) throws IOException {
+      if (uncompressed == null || uncompressed.remaining() == 0) {
+        if (currentOffset == this.length) {
+          return -1;
+        }
+        readHeader();
+      }
+      int actualLength = Math.min(length, uncompressed.remaining());
+      uncompressed.get(data, offset, actualLength);
+      return actualLength;
+    }
+
+    @Override
+    public int available() throws IOException {
+      if (uncompressed == null || uncompressed.remaining() == 0) {
+        if (currentOffset == length) {
+          return 0;
+        }
+        readHeader();
+      }
+      return uncompressed.remaining();
+    }
+
+    @Override
+    public void close() {
+      uncompressed = null;
+      compressed = null;
+      currentRange = bytes.size();
+      currentOffset = length;
+      bytes.clear();
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      seek(index.getNext());
+      long uncompressedBytes = index.getNext();
+      if (uncompressedBytes != 0) {
+        readHeader();
+        uncompressed.position(uncompressed.position() +
+                              (int) uncompressedBytes);
+      } else if (uncompressed != null) {
+        // mark the uncompressed buffer as done
+        uncompressed.position(uncompressed.limit());
+      }
+    }
+
+    /* slices a read only contiguous buffer of chunkLength */
+    private ByteBuffer slice(int chunkLength) throws IOException {
+      int len = chunkLength;
+      final long oldOffset = currentOffset;
+      ByteBuffer slice;
+      if (compressed.remaining() >= len) {
+        slice = compressed.slice();
+        // simple case
+        slice.limit(len);
+        currentOffset += len;
+        compressed.position(compressed.position() + len);
+        return slice;
+      } else if (currentRange >= (bytes.size() - 1)) {
+        // nothing has been modified yet
+        throw new IOException("EOF in " + this + " while trying to read " +
+            chunkLength + " bytes");
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format(
+            "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)",
+            compressed.remaining(), len));
+      }
+
+      // we need to consolidate 2 or more buffers into 1
+      // first copy out compressed buffers
+      ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+      currentOffset += compressed.remaining();
+      len -= compressed.remaining();
+      copy.put(compressed);
+      ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
+
+      while (len > 0 && iter.hasNext()) {
+        ++currentRange;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
+        }
+        DiskRange range = iter.next();
+        compressed = range.getData().duplicate();
+        if (compressed.remaining() >= len) {
+          slice = compressed.slice();
+          slice.limit(len);
+          copy.put(slice);
+          currentOffset += len;
+          compressed.position(compressed.position() + len);
+          return copy;
+        }
+        currentOffset += compressed.remaining();
+        len -= compressed.remaining();
+        copy.put(compressed);
+      }
+
+      // restore offsets for exception clarity
+      seek(oldOffset);
+      throw new IOException("EOF in " + this + " while trying to read " +
+          chunkLength + " bytes");
+    }
+
+    private void seek(long desired) throws IOException {
+      if (desired == 0 && bytes.isEmpty()) {
+        logEmptySeek(name);
+        return;
+      }
+      int i = 0;
+      for (DiskRange range : bytes) {
+        if (range.getOffset() <= desired && desired < range.getEnd()) {
+          currentRange = i;
+          compressed = range.getData().duplicate();
+          int pos = compressed.position();
+          pos += (int)(desired - range.getOffset());
+          compressed.position(pos);
+          currentOffset = desired;
+          return;
+        }
+        ++i;
+      }
+      // if they are seeking to the precise end, go ahead and let them go there
+      int segments = bytes.size();
+      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+        DiskRange range = bytes.get(segments - 1);
+        currentRange = segments - 1;
+        compressed = range.getData().duplicate();
+        compressed.position(compressed.limit());
+        currentOffset = desired;
+        return;
+      }
+      throw new IOException("Seek outside of data in " + this + " to " + desired);
+    }
+
+    private String rangeString() {
+      StringBuilder builder = new StringBuilder();
+      int i = 0;
+      for (DiskRange range : bytes) {
+        if (i != 0) {
+          builder.append("; ");
+        }
+        builder.append(" range " + i + " = " + range.getOffset()
+            + " to " + (range.getEnd() - range.getOffset()));
+        ++i;
+      }
+      return builder.toString();
+    }
+
+    @Override
+    public String toString() {
+      return "compressed stream " + name + " position: " + currentOffset +
+          " length: " + length + " range: " + currentRange +
+          " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
+          rangeString() +
+          (uncompressed == null ? "" :
+              " uncompressed: " + uncompressed.position() + " to " +
+                  uncompressed.limit());
+    }
+  }
+
+  public abstract void seek(PositionProvider index) throws IOException;
+
+  private static void logEmptySeek(String name) {
+    if (LOG.isWarnEnabled()) {
+      LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream.");
+    }
+  }
+
+  /**
+   * Create an input stream from a list of buffers.
+   * @param fileName name of the file
+   * @param streamName the name of the stream
+   * @param buffers the list of ranges of bytes for the stream
+   * @param offsets a list of offsets (the same length as input) that must
+   *                contain the first offset of the each set of bytes in input
+   * @param length the length in bytes of the stream
+   * @param codec the compression codec
+   * @param bufferSize the compression buffer size
+   * @return an input stream
+   * @throws IOException
+   */
+  @VisibleForTesting
+  @Deprecated
+  public static InStream create(String streamName,
+                                ByteBuffer[] buffers,
+                                long[] offsets,
+                                long length,
+                                CompressionCodec codec,
+                                int bufferSize) throws IOException {
+    List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
+    for (int i = 0; i < buffers.length; ++i) {
+      input.add(new BufferChunk(buffers[i], offsets[i]));
+    }
+    return create(streamName, input, length, codec, bufferSize);
+  }
+
+  /**
+   * Create an input stream from a list of disk ranges with data.
+   * @param name the name of the stream
+   * @param input the list of ranges of bytes for the stream; from disk or cache
+   * @param length the length in bytes of the stream
+   * @param codec the compression codec
+   * @param bufferSize the compression buffer size
+   * @return an input stream
+   * @throws IOException
+   */
+  public static InStream create(String name,
+                                List<DiskRange> input,
+                                long length,
+                                CompressionCodec codec,
+                                int bufferSize) throws IOException {
+    if (codec == null) {
+      return new UncompressedStream(name, input, length);
+    } else {
+      return new CompressedStream(name, input, length, codec, bufferSize);
+    }
+  }
+
+  /**
+   * Creates coded input stream (used for protobuf message parsing) with higher message size limit.
+   *
+   * @param name       the name of the stream
+   * @param input      the list of ranges of bytes for the stream; from disk or cache
+   * @param length     the length in bytes of the stream
+   * @param codec      the compression codec
+   * @param bufferSize the compression buffer size
+   * @return coded input stream
+   * @throws IOException
+   */
+  public static CodedInputStream createCodedInputStream(
+      String name,
+      List<DiskRange> input,
+      long length,
+      CompressionCodec codec,
+      int bufferSize) throws IOException {
+    InStream inStream = create(name, input, length, codec, bufferSize);
+    CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream);
+    codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT);
+    return codedInputStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java
new file mode 100644
index 0000000..b928559
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+/**
+ * Interface for reading integers.
+ */
+public interface IntegerReader {
+
+  /**
+   * Seek to the position provided by index.
+   * @param index
+   * @throws IOException
+   */
+  void seek(PositionProvider index) throws IOException;
+
+  /**
+   * Skip number of specified rows.
+   * @param numValues
+   * @throws IOException
+   */
+  void skip(long numValues) throws IOException;
+
+  /**
+   * Check if there are any more values left.
+   * @return
+   * @throws IOException
+   */
+  boolean hasNext() throws IOException;
+
+  /**
+   * Return the next available value.
+   * @return
+   * @throws IOException
+   */
+  long next() throws IOException;
+
+  /**
+   * Return the next available vector for values.
+   * @return
+   * @throws IOException
+   */
+   void nextVector(LongColumnVector previous, long previousLen)
+      throws IOException;
+
+  void setInStream(InStream data);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerWriter.java b/orc/src/java/org/apache/orc/impl/IntegerWriter.java
new file mode 100644
index 0000000..419054f
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/IntegerWriter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing integers.
+ */
+public interface IntegerWriter {
+
+  /**
+   * Get position from the stream.
+   * @param recorder
+   * @throws IOException
+   */
+  void getPosition(PositionRecorder recorder) throws IOException;
+
+  /**
+   * Write the integer value
+   * @param value
+   * @throws IOException
+   */
+  void write(long value) throws IOException;
+
+  /**
+   * Flush the buffer
+   * @throws IOException
+   */
+  void flush() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/MemoryManager.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MemoryManager.java b/orc/src/java/org/apache/orc/impl/MemoryManager.java
new file mode 100644
index 0000000..2dbfba7
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/MemoryManager.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.OrcConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implements a memory manager that keeps a global context of how many ORC
+ * writers there are and manages the memory between them. For use cases with
+ * dynamic partitions, it is easy to end up with many writers in the same task.
+ * By managing the size of each allocation, we try to cut down the size of each
+ * allocation and keep the task from running out of memory.
+ * 
+ * This class is not thread safe, but is re-entrant - ensure creation and all
+ * invocations are triggered from the same thread.
+ */
+public class MemoryManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class);
+
+  /**
+   * How often should we check the memory sizes? Measured in rows added
+   * to all of the writers.
+   */
+  private static final int ROWS_BETWEEN_CHECKS = 5000;
+  private final long totalMemoryPool;
+  private final Map<Path, WriterInfo> writerList =
+      new HashMap<Path, WriterInfo>();
+  private long totalAllocation = 0;
+  private double currentScale = 1;
+  private int rowsAddedSinceCheck = 0;
+  private final OwnedLock ownerLock = new OwnedLock();
+
+  @SuppressWarnings("serial")
+  private static class OwnedLock extends ReentrantLock {
+    public Thread getOwner() {
+      return super.getOwner();
+    }
+  }
+
+  private static class WriterInfo {
+    long allocation;
+    Callback callback;
+    WriterInfo(long allocation, Callback callback) {
+      this.allocation = allocation;
+      this.callback = callback;
+    }
+  }
+
+  public interface Callback {
+    /**
+     * The writer needs to check its memory usage
+     * @param newScale the current scale factor for memory allocations
+     * @return true if the writer was over the limit
+     * @throws IOException
+     */
+    boolean checkMemory(double newScale) throws IOException;
+  }
+
+  /**
+   * Create the memory manager.
+   * @param conf use the configuration to find the maximum size of the memory
+   *             pool.
+   */
+  public MemoryManager(Configuration conf) {
+    double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
+    totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
+        getHeapMemoryUsage().getMax() * maxLoad);
+    ownerLock.lock();
+  }
+
+  /**
+   * Light weight thread-safety check for multi-threaded access patterns
+   */
+  private void checkOwner() {
+    Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(),
+        "Owner thread expected %s, got %s",
+        ownerLock.getOwner(),
+        Thread.currentThread());
+  }
+
+  /**
+   * Add a new writer's memory allocation to the pool. We use the path
+   * as a unique key to ensure that we don't get duplicates.
+   * @param path the file that is being written
+   * @param requestedAllocation the requested buffer size
+   */
+  public void addWriter(Path path, long requestedAllocation,
+                              Callback callback) throws IOException {
+    checkOwner();
+    WriterInfo oldVal = writerList.get(path);
+    // this should always be null, but we handle the case where the memory
+    // manager wasn't told that a writer wasn't still in use and the task
+    // starts writing to the same path.
+    if (oldVal == null) {
+      oldVal = new WriterInfo(requestedAllocation, callback);
+      writerList.put(path, oldVal);
+      totalAllocation += requestedAllocation;
+    } else {
+      // handle a new writer that is writing to the same path
+      totalAllocation += requestedAllocation - oldVal.allocation;
+      oldVal.allocation = requestedAllocation;
+      oldVal.callback = callback;
+    }
+    updateScale(true);
+  }
+
+  /**
+   * Remove the given writer from the pool.
+   * @param path the file that has been closed
+   */
+  public void removeWriter(Path path) throws IOException {
+    checkOwner();
+    WriterInfo val = writerList.get(path);
+    if (val != null) {
+      writerList.remove(path);
+      totalAllocation -= val.allocation;
+      if (writerList.isEmpty()) {
+        rowsAddedSinceCheck = 0;
+      }
+      updateScale(false);
+    }
+    if(writerList.isEmpty()) {
+      rowsAddedSinceCheck = 0;
+    }
+  }
+
+  /**
+   * Get the total pool size that is available for ORC writers.
+   * @return the number of bytes in the pool
+   */
+  public long getTotalMemoryPool() {
+    return totalMemoryPool;
+  }
+
+  /**
+   * The scaling factor for each allocation to ensure that the pool isn't
+   * oversubscribed.
+   * @return a fraction between 0.0 and 1.0 of the requested size that is
+   * available for each writer.
+   */
+  public double getAllocationScale() {
+    return currentScale;
+  }
+
+  /**
+   * Give the memory manager an opportunity for doing a memory check.
+   * @param rows number of rows added
+   * @throws IOException
+   */
+  public void addedRow(int rows) throws IOException {
+    rowsAddedSinceCheck += rows;
+    if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
+      notifyWriters();
+    }
+  }
+
+  /**
+   * Notify all of the writers that they should check their memory usage.
+   * @throws IOException
+   */
+  public void notifyWriters() throws IOException {
+    checkOwner();
+    LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
+    for(WriterInfo writer: writerList.values()) {
+      boolean flushed = writer.callback.checkMemory(currentScale);
+      if (LOG.isDebugEnabled() && flushed) {
+        LOG.debug("flushed " + writer.toString());
+      }
+    }
+    rowsAddedSinceCheck = 0;
+  }
+
+  /**
+   * Update the currentScale based on the current allocation and pool size.
+   * This also updates the notificationTrigger.
+   * @param isAllocate is this an allocation?
+   */
+  private void updateScale(boolean isAllocate) throws IOException {
+    if (totalAllocation <= totalMemoryPool) {
+      currentScale = 1;
+    } else {
+      currentScale = (double) totalMemoryPool / totalAllocation;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/MetadataReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReader.java b/orc/src/java/org/apache/orc/impl/MetadataReader.java
new file mode 100644
index 0000000..670a81a
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/MetadataReader.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+import org.apache.orc.OrcProto;
+import org.apache.orc.StripeInformation;
+
+public interface MetadataReader {
+  OrcIndex readRowIndex(StripeInformation stripe,
+                                      OrcProto.StripeFooter footer,
+      boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns,
+      OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException;
+
+  OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException;
+
+  void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java b/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
new file mode 100644
index 0000000..d0ded52
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/MetadataReaderImpl.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StripeInformation;
+
+import com.google.common.collect.Lists;
+
+public class MetadataReaderImpl implements MetadataReader {
+  private final FSDataInputStream file;
+  private final CompressionCodec codec;
+  private final int bufferSize;
+  private final int typeCount;
+
+  public MetadataReaderImpl(FileSystem fileSystem, Path path,
+      CompressionCodec codec, int bufferSize, int typeCount) throws IOException {
+    this(fileSystem.open(path), codec, bufferSize, typeCount);
+  }
+
+  public MetadataReaderImpl(FSDataInputStream file,
+      CompressionCodec codec, int bufferSize, int typeCount) {
+    this.file = file;
+    this.codec = codec;
+    this.bufferSize = bufferSize;
+    this.typeCount = typeCount;
+  }
+
+  @Override
+  public OrcIndex readRowIndex(StripeInformation stripe,
+      OrcProto.StripeFooter footer, boolean[] included, OrcProto.RowIndex[] indexes,
+      boolean[] sargColumns, OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException {
+    if (footer == null) {
+      footer = readStripeFooter(stripe);
+    }
+    if (indexes == null) {
+      indexes = new OrcProto.RowIndex[typeCount];
+    }
+    if (bloomFilterIndices == null) {
+      bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
+    }
+    long offset = stripe.getOffset();
+    List<OrcProto.Stream> streams = footer.getStreamsList();
+    for (int i = 0; i < streams.size(); i++) {
+      OrcProto.Stream stream = streams.get(i);
+      OrcProto.Stream nextStream = null;
+      if (i < streams.size() - 1) {
+        nextStream = streams.get(i+1);
+      }
+      int col = stream.getColumn();
+      int len = (int) stream.getLength();
+      // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
+      // filter and combine the io to read row index and bloom filters for that column together
+      if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
+        boolean readBloomFilter = false;
+        if (sargColumns != null && sargColumns[col] &&
+            nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
+          len += nextStream.getLength();
+          i += 1;
+          readBloomFilter = true;
+        }
+        if ((included == null || included[col]) && indexes[col] == null) {
+          byte[] buffer = new byte[len];
+          file.readFully(offset, buffer, 0, buffer.length);
+          ByteBuffer bb = ByteBuffer.wrap(buffer);
+          indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
+              Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
+              codec, bufferSize));
+          if (readBloomFilter) {
+            bb.position((int) stream.getLength());
+            bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
+                "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
+                nextStream.getLength(), codec, bufferSize));
+          }
+        }
+      }
+      offset += len;
+    }
+
+    OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
+    return index;
+  }
+
+  @Override
+  public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+    long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
+    int tailLength = (int) stripe.getFooterLength();
+
+    // read the footer
+    ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+    file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
+    return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
+        Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
+        tailLength, codec, bufferSize));
+  }
+
+  @Override
+  public void close() throws IOException {
+    file.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/OrcIndex.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcIndex.java b/orc/src/java/org/apache/orc/impl/OrcIndex.java
new file mode 100644
index 0000000..50a15f2
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/OrcIndex.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.OrcProto;
+
+public final class OrcIndex {
+  OrcProto.RowIndex[] rowGroupIndex;
+  OrcProto.BloomFilterIndex[] bloomFilterIndex;
+
+  public OrcIndex(OrcProto.RowIndex[] rgIndex, OrcProto.BloomFilterIndex[] bfIndex) {
+    this.rowGroupIndex = rgIndex;
+    this.bloomFilterIndex = bfIndex;
+  }
+
+  public OrcProto.RowIndex[] getRowGroupIndex() {
+    return rowGroupIndex;
+  }
+
+  public OrcProto.BloomFilterIndex[] getBloomFilterIndex() {
+    return bloomFilterIndex;
+  }
+
+  public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) {
+    this.rowGroupIndex = rowGroupIndex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OutStream.java b/orc/src/java/org/apache/orc/impl/OutStream.java
new file mode 100644
index 0000000..68ef7d4
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/OutStream.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import org.apache.orc.CompressionCodec;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class OutStream extends PositionedOutputStream {
+
+  public interface OutputReceiver {
+    /**
+     * Output the given buffer to the final destination
+     * @param buffer the buffer to output
+     * @throws IOException
+     */
+    void output(ByteBuffer buffer) throws IOException;
+  }
+
+  public static final int HEADER_SIZE = 3;
+  private final String name;
+  private final OutputReceiver receiver;
+  // if enabled the stream will be suppressed when writing stripe
+  private boolean suppress;
+
+  /**
+   * Stores the uncompressed bytes that have been serialized, but not
+   * compressed yet. When this fills, we compress the entire buffer.
+   */
+  private ByteBuffer current = null;
+
+  /**
+   * Stores the compressed bytes until we have a full buffer and then outputs
+   * them to the receiver. If no compression is being done, this (and overflow)
+   * will always be null and the current buffer will be sent directly to the
+   * receiver.
+   */
+  private ByteBuffer compressed = null;
+
+  /**
+   * Since the compressed buffer may start with contents from previous
+   * compression blocks, we allocate an overflow buffer so that the
+   * output of the codec can be split between the two buffers. After the
+   * compressed buffer is sent to the receiver, the overflow buffer becomes
+   * the new compressed buffer.
+   */
+  private ByteBuffer overflow = null;
+  private final int bufferSize;
+  private final CompressionCodec codec;
+  private long compressedBytes = 0;
+  private long uncompressedBytes = 0;
+
+  public OutStream(String name,
+                   int bufferSize,
+                   CompressionCodec codec,
+                   OutputReceiver receiver) throws IOException {
+    this.name = name;
+    this.bufferSize = bufferSize;
+    this.codec = codec;
+    this.receiver = receiver;
+    this.suppress = false;
+  }
+
+  public void clear() throws IOException {
+    flush();
+    suppress = false;
+  }
+
+  /**
+   * Write the length of the compressed bytes. Life is much easier if the
+   * header is constant length, so just use 3 bytes. Considering most of the
+   * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should
+   * be plenty. We also use the low bit for whether it is the original or
+   * compressed bytes.
+   * @param buffer the buffer to write the header to
+   * @param position the position in the buffer to write at
+   * @param val the size in the file
+   * @param original is it uncompressed
+   */
+  private static void writeHeader(ByteBuffer buffer,
+                                  int position,
+                                  int val,
+                                  boolean original) {
+    buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
+    buffer.put(position + 1, (byte) (val >> 7));
+    buffer.put(position + 2, (byte) (val >> 15));
+  }
+
+  private void getNewInputBuffer() throws IOException {
+    if (codec == null) {
+      current = ByteBuffer.allocate(bufferSize);
+    } else {
+      current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+      writeHeader(current, 0, bufferSize, true);
+      current.position(HEADER_SIZE);
+    }
+  }
+
+  /**
+   * Allocate a new output buffer if we are compressing.
+   */
+  private ByteBuffer getNewOutputBuffer() throws IOException {
+    return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+  }
+
+  private void flip() throws IOException {
+    current.limit(current.position());
+    current.position(codec == null ? 0 : HEADER_SIZE);
+  }
+
+  @Override
+  public void write(int i) throws IOException {
+    if (current == null) {
+      getNewInputBuffer();
+    }
+    if (current.remaining() < 1) {
+      spill();
+    }
+    uncompressedBytes += 1;
+    current.put((byte) i);
+  }
+
+  @Override
+  public void write(byte[] bytes, int offset, int length) throws IOException {
+    if (current == null) {
+      getNewInputBuffer();
+    }
+    int remaining = Math.min(current.remaining(), length);
+    current.put(bytes, offset, remaining);
+    uncompressedBytes += remaining;
+    length -= remaining;
+    while (length != 0) {
+      spill();
+      offset += remaining;
+      remaining = Math.min(current.remaining(), length);
+      current.put(bytes, offset, remaining);
+      uncompressedBytes += remaining;
+      length -= remaining;
+    }
+  }
+
+  private void spill() throws java.io.IOException {
+    // if there isn't anything in the current buffer, don't spill
+    if (current == null ||
+        current.position() == (codec == null ? 0 : HEADER_SIZE)) {
+      return;
+    }
+    flip();
+    if (codec == null) {
+      receiver.output(current);
+      getNewInputBuffer();
+    } else {
+      if (compressed == null) {
+        compressed = getNewOutputBuffer();
+      } else if (overflow == null) {
+        overflow = getNewOutputBuffer();
+      }
+      int sizePosn = compressed.position();
+      compressed.position(compressed.position() + HEADER_SIZE);
+      if (codec.compress(current, compressed, overflow)) {
+        uncompressedBytes = 0;
+        // move position back to after the header
+        current.position(HEADER_SIZE);
+        current.limit(current.capacity());
+        // find the total bytes in the chunk
+        int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
+        if (overflow != null) {
+          totalBytes += overflow.position();
+        }
+        compressedBytes += totalBytes + HEADER_SIZE;
+        writeHeader(compressed, sizePosn, totalBytes, false);
+        // if we have less than the next header left, spill it.
+        if (compressed.remaining() < HEADER_SIZE) {
+          compressed.flip();
+          receiver.output(compressed);
+          compressed = overflow;
+          overflow = null;
+        }
+      } else {
+        compressedBytes += uncompressedBytes + HEADER_SIZE;
+        uncompressedBytes = 0;
+        // we are using the original, but need to spill the current
+        // compressed buffer first. So back up to where we started,
+        // flip it and add it to done.
+        if (sizePosn != 0) {
+          compressed.position(sizePosn);
+          compressed.flip();
+          receiver.output(compressed);
+          compressed = null;
+          // if we have an overflow, clear it and make it the new compress
+          // buffer
+          if (overflow != null) {
+            overflow.clear();
+            compressed = overflow;
+            overflow = null;
+          }
+        } else {
+          compressed.clear();
+          if (overflow != null) {
+            overflow.clear();
+          }
+        }
+
+        // now add the current buffer into the done list and get a new one.
+        current.position(0);
+        // update the header with the current length
+        writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
+        receiver.output(current);
+        getNewInputBuffer();
+      }
+    }
+  }
+
+  @Override
+  public void getPosition(PositionRecorder recorder) throws IOException {
+    if (codec == null) {
+      recorder.addPosition(uncompressedBytes);
+    } else {
+      recorder.addPosition(compressedBytes);
+      recorder.addPosition(uncompressedBytes);
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    spill();
+    if (compressed != null && compressed.position() != 0) {
+      compressed.flip();
+      receiver.output(compressed);
+      compressed = null;
+    }
+    uncompressedBytes = 0;
+    compressedBytes = 0;
+    overflow = null;
+    current = null;
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+
+  @Override
+  public long getBufferSize() {
+    long result = 0;
+    if (current != null) {
+      result += current.capacity();
+    }
+    if (compressed != null) {
+      result += compressed.capacity();
+    }
+    if (overflow != null) {
+      result += overflow.capacity();
+    }
+    return result;
+  }
+
+  /**
+   * Set suppress flag
+   */
+  public void suppress() {
+    suppress = true;
+  }
+
+  /**
+   * Returns the state of suppress flag
+   * @return value of suppress flag
+   */
+  public boolean isSuppressed() {
+    return suppress;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/PositionProvider.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PositionProvider.java b/orc/src/java/org/apache/orc/impl/PositionProvider.java
new file mode 100644
index 0000000..47cf481
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/PositionProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+/**
+ * An interface used for seeking to a row index.
+ */
+public interface PositionProvider {
+  long getNext();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/PositionRecorder.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PositionRecorder.java b/orc/src/java/org/apache/orc/impl/PositionRecorder.java
new file mode 100644
index 0000000..1fff760
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/PositionRecorder.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+/**
+ * An interface for recording positions in a stream.
+ */
+public interface PositionRecorder {
+  void addPosition(long offset);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java b/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java
new file mode 100644
index 0000000..d412939
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/PositionedOutputStream.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class PositionedOutputStream extends OutputStream {
+
+  /**
+   * Record the current position to the recorder.
+   * @param recorder the object that receives the position
+   * @throws IOException
+   */
+  public abstract void getPosition(PositionRecorder recorder
+                                   ) throws IOException;
+
+  /**
+   * Get the memory size currently allocated as buffer associated with this
+   * stream.
+   * @return the number of bytes used by buffers.
+   */
+  public abstract long getBufferSize();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/RedBlackTree.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RedBlackTree.java b/orc/src/java/org/apache/orc/impl/RedBlackTree.java
new file mode 100644
index 0000000..41aa4b9
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RedBlackTree.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.impl.DynamicIntArray;
+
+/**
+ * A memory efficient red-black tree that does not allocate any objects per
+ * an element. This class is abstract and assumes that the child class
+ * handles the key and comparisons with the key.
+ */
+abstract class RedBlackTree {
+  public static final int NULL = -1;
+
+  // Various values controlling the offset of the data within the array.
+  private static final int LEFT_OFFSET = 0;
+  private static final int RIGHT_OFFSET = 1;
+  private static final int ELEMENT_SIZE = 2;
+
+  protected int size = 0;
+  private final DynamicIntArray data;
+  protected int root = NULL;
+  protected int lastAdd = 0;
+  private boolean wasAdd = false;
+
+  /**
+   * Create a set with the given initial capacity.
+   */
+  public RedBlackTree(int initialCapacity) {
+    data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE);
+  }
+
+  /**
+   * Insert a new node into the data array, growing the array as necessary.
+   *
+   * @return Returns the position of the new node.
+   */
+  private int insert(int left, int right, boolean isRed) {
+    int position = size;
+    size += 1;
+    setLeft(position, left, isRed);
+    setRight(position, right);
+    return position;
+  }
+
+  /**
+   * Compare the value at the given position to the new value.
+   * @return 0 if the values are the same, -1 if the new value is smaller and
+   *         1 if the new value is larger.
+   */
+  protected abstract int compareValue(int position);
+
+  /**
+   * Is the given node red as opposed to black? To prevent having an extra word
+   * in the data array, we just the low bit on the left child index.
+   */
+  protected boolean isRed(int position) {
+    return position != NULL &&
+        (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1;
+  }
+
+  /**
+   * Set the red bit true or false.
+   */
+  private void setRed(int position, boolean isRed) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    if (isRed) {
+      data.set(offset, data.get(offset) | 1);
+    } else {
+      data.set(offset, data.get(offset) & ~1);
+    }
+  }
+
+  /**
+   * Get the left field of the given position.
+   */
+  protected int getLeft(int position) {
+    return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1;
+  }
+
+  /**
+   * Get the right field of the given position.
+   */
+  protected int getRight(int position) {
+    return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET);
+  }
+
+  /**
+   * Set the left field of the given position.
+   * Note that we are storing the node color in the low bit of the left pointer.
+   */
+  private void setLeft(int position, int left) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    data.set(offset, (left << 1) | (data.get(offset) & 1));
+  }
+
+  /**
+   * Set the left field of the given position.
+   * Note that we are storing the node color in the low bit of the left pointer.
+   */
+  private void setLeft(int position, int left, boolean isRed) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    data.set(offset, (left << 1) | (isRed ? 1 : 0));
+  }
+
+  /**
+   * Set the right field of the given position.
+   */
+  private void setRight(int position, int right) {
+    data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right);
+  }
+
+  /**
+   * Insert or find a given key in the tree and rebalance the tree correctly.
+   * Rebalancing restores the red-black aspect of the tree to maintain the
+   * invariants:
+   *   1. If a node is red, both of its children are black.
+   *   2. Each child of a node has the same black height (the number of black
+   *      nodes between it and the leaves of the tree).
+   *
+   * Inserted nodes are at the leaves and are red, therefore there is at most a
+   * violation of rule 1 at the node we just put in. Instead of always keeping
+   * the parents, this routine passing down the context.
+   *
+   * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are
+   * left-right mirror images of each other). See Algorighms by Cormen,
+   * Leiserson, and Rivest for the explaination of the subcases.
+   *
+   * @param node The node that we are fixing right now.
+   * @param fromLeft Did we come down from the left?
+   * @param parent Nodes' parent
+   * @param grandparent Parent's parent
+   * @param greatGrandparent Grandparent's parent
+   * @return Does parent also need to be checked and/or fixed?
+   */
+  private boolean add(int node, boolean fromLeft, int parent,
+                      int grandparent, int greatGrandparent) {
+    if (node == NULL) {
+      if (root == NULL) {
+        lastAdd = insert(NULL, NULL, false);
+        root = lastAdd;
+        wasAdd = true;
+        return false;
+      } else {
+        lastAdd = insert(NULL, NULL, true);
+        node = lastAdd;
+        wasAdd = true;
+        // connect the new node into the tree
+        if (fromLeft) {
+          setLeft(parent, node);
+        } else {
+          setRight(parent, node);
+        }
+      }
+    } else {
+      int compare = compareValue(node);
+      boolean keepGoing;
+
+      // Recurse down to find where the node needs to be added
+      if (compare < 0) {
+        keepGoing = add(getLeft(node), true, node, parent, grandparent);
+      } else if (compare > 0) {
+        keepGoing = add(getRight(node), false, node, parent, grandparent);
+      } else {
+        lastAdd = node;
+        wasAdd = false;
+        return false;
+      }
+
+      // we don't need to fix the root (because it is always set to black)
+      if (node == root || !keepGoing) {
+        return false;
+      }
+    }
+
+
+    // Do we need to fix this node? Only if there are two reds right under each
+    // other.
+    if (isRed(node) && isRed(parent)) {
+      if (parent == getLeft(grandparent)) {
+        int uncle = getRight(grandparent);
+        if (isRed(uncle)) {
+          // case 1.1
+          setRed(parent, false);
+          setRed(uncle, false);
+          setRed(grandparent, true);
+          return true;
+        } else {
+          if (node == getRight(parent)) {
+            // case 1.2
+            // swap node and parent
+            int tmp = node;
+            node = parent;
+            parent = tmp;
+            // left-rotate on node
+            setLeft(grandparent, parent);
+            setRight(node, getLeft(parent));
+            setLeft(parent, node);
+          }
+
+          // case 1.2 and 1.3
+          setRed(parent, false);
+          setRed(grandparent, true);
+
+          // right-rotate on grandparent
+          if (greatGrandparent == NULL) {
+            root = parent;
+          } else if (getLeft(greatGrandparent) == grandparent) {
+            setLeft(greatGrandparent, parent);
+          } else {
+            setRight(greatGrandparent, parent);
+          }
+          setLeft(grandparent, getRight(parent));
+          setRight(parent, grandparent);
+          return false;
+        }
+      } else {
+        int uncle = getLeft(grandparent);
+        if (isRed(uncle)) {
+          // case 2.1
+          setRed(parent, false);
+          setRed(uncle, false);
+          setRed(grandparent, true);
+          return true;
+        } else {
+          if (node == getLeft(parent)) {
+            // case 2.2
+            // swap node and parent
+            int tmp = node;
+            node = parent;
+            parent = tmp;
+            // right-rotate on node
+            setRight(grandparent, parent);
+            setLeft(node, getRight(parent));
+            setRight(parent, node);
+          }
+          // case 2.2 and 2.3
+          setRed(parent, false);
+          setRed(grandparent, true);
+          // left-rotate on grandparent
+          if (greatGrandparent == NULL) {
+            root = parent;
+          } else if (getRight(greatGrandparent) == grandparent) {
+            setRight(greatGrandparent, parent);
+          } else {
+            setLeft(greatGrandparent, parent);
+          }
+          setRight(grandparent, getLeft(parent));
+          setLeft(parent, grandparent);
+          return false;
+        }
+      }
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * Add the new key to the tree.
+   * @return true if the element is a new one.
+   */
+  protected boolean add() {
+    add(root, false, NULL, NULL, NULL);
+    if (wasAdd) {
+      setRed(root, false);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Get the number of elements in the set.
+   */
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Reset the table to empty.
+   */
+  public void clear() {
+    root = NULL;
+    size = 0;
+    data.clear();
+  }
+
+  /**
+   * Get the buffer size in bytes.
+   */
+  public long getSizeInBytes() {
+    return data.getSizeInBytes();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
new file mode 100644
index 0000000..380f339
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+/**
+ * A reader that reads a sequence of bytes. A control byte is read before
+ * each run with positive values 0 to 127 meaning 3 to 130 repetitions. If the
+ * byte is -1 to -128, 1 to 128 literal byte values follow.
+ */
+public class RunLengthByteReader {
+  private InStream input;
+  private final byte[] literals =
+    new byte[RunLengthByteWriter.MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private int used = 0;
+  private boolean repeat = false;
+
+  public RunLengthByteReader(InStream input) throws IOException {
+    this.input = input;
+  }
+
+  public void setInStream(InStream input) {
+    this.input = input;
+  }
+
+  private void readValues(boolean ignoreEof) throws IOException {
+    int control = input.read();
+    used = 0;
+    if (control == -1) {
+      if (!ignoreEof) {
+        throw new EOFException("Read past end of buffer RLE byte from " + input);
+      }
+      used = numLiterals = 0;
+      return;
+    } else if (control < 0x80) {
+      repeat = true;
+      numLiterals = control + RunLengthByteWriter.MIN_REPEAT_SIZE;
+      int val = input.read();
+      if (val == -1) {
+        throw new EOFException("Reading RLE byte got EOF");
+      }
+      literals[0] = (byte) val;
+    } else {
+      repeat = false;
+      numLiterals = 0x100 - control;
+      int bytes = 0;
+      while (bytes < numLiterals) {
+        int result = input.read(literals, bytes, numLiterals - bytes);
+        if (result == -1) {
+          throw new EOFException("Reading RLE byte literal got EOF in " + this);
+        }
+        bytes += result;
+      }
+    }
+  }
+
+  public boolean hasNext() throws IOException {
+    return used != numLiterals || input.available() > 0;
+  }
+
+  public byte next() throws IOException {
+    byte result;
+    if (used == numLiterals) {
+      readValues(false);
+    }
+    if (repeat) {
+      result = literals[0];
+    } else {
+      result = literals[used];
+    }
+    ++used;
+    return result;
+  }
+
+  public void nextVector(LongColumnVector previous, long previousLen)
+      throws IOException {
+    previous.isRepeating = true;
+    for (int i = 0; i < previousLen; i++) {
+      if (!previous.isNull[i]) {
+        previous.vector[i] = next();
+      } else {
+        // The default value of null for int types in vectorized
+        // processing is 1, so set that if the value is null
+        previous.vector[i] = 1;
+      }
+
+      // The default value for nulls in Vectorization for int types is 1
+      // and given that non null value can also be 1, we need to check for isNull also
+      // when determining the isRepeating flag.
+      if (previous.isRepeating
+          && i > 0
+          && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
+        previous.isRepeating = false;
+      }
+    }
+  }
+
+  public void seek(PositionProvider index) throws IOException {
+    input.seek(index);
+    int consumed = (int) index.getNext();
+    if (consumed != 0) {
+      // a loop is required for cases where we break the run into two parts
+      while (consumed > 0) {
+        readValues(false);
+        used = consumed;
+        consumed -= numLiterals;
+      }
+    } else {
+      used = 0;
+      numLiterals = 0;
+    }
+  }
+
+  public void skip(long items) throws IOException {
+    while (items > 0) {
+      if (used == numLiterals) {
+        readValues(false);
+      }
+      long consume = Math.min(items, numLiterals - used);
+      used += consume;
+      items -= consume;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "byte rle " + (repeat ? "repeat" : "literal") + " used: " +
+        used + "/" + numLiterals + " from " + input;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java b/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java
new file mode 100644
index 0000000..09108b2
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A streamFactory that writes a sequence of bytes. A control byte is written before
+ * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the
+ * bytes is -1 to -128, 1 to 128 literal byte values follow.
+ */
+public class RunLengthByteWriter {
+  static final int MIN_REPEAT_SIZE = 3;
+  static final int MAX_LITERAL_SIZE = 128;
+  static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE;
+  private final PositionedOutputStream output;
+  private final byte[] literals = new byte[MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private boolean repeat = false;
+  private int tailRunLength = 0;
+
+  public RunLengthByteWriter(PositionedOutputStream output) {
+    this.output = output;
+  }
+
+  private void writeValues() throws IOException {
+    if (numLiterals != 0) {
+      if (repeat) {
+        output.write(numLiterals - MIN_REPEAT_SIZE);
+        output.write(literals, 0, 1);
+     } else {
+        output.write(-numLiterals);
+        output.write(literals, 0, numLiterals);
+      }
+      repeat = false;
+      tailRunLength = 0;
+      numLiterals = 0;
+    }
+  }
+
+  public void flush() throws IOException {
+    writeValues();
+    output.flush();
+  }
+
+  public void write(byte value) throws IOException {
+    if (numLiterals == 0) {
+      literals[numLiterals++] = value;
+      tailRunLength = 1;
+    } else if (repeat) {
+      if (value == literals[0]) {
+        numLiterals += 1;
+        if (numLiterals == MAX_REPEAT_SIZE) {
+          writeValues();
+        }
+      } else {
+        writeValues();
+        literals[numLiterals++] = value;
+        tailRunLength = 1;
+      }
+    } else {
+      if (value == literals[numLiterals - 1]) {
+        tailRunLength += 1;
+      } else {
+        tailRunLength = 1;
+      }
+      if (tailRunLength == MIN_REPEAT_SIZE) {
+        if (numLiterals + 1 == MIN_REPEAT_SIZE) {
+          repeat = true;
+          numLiterals += 1;
+        } else {
+          numLiterals -= MIN_REPEAT_SIZE - 1;
+          writeValues();
+          literals[0] = value;
+          repeat = true;
+          numLiterals = MIN_REPEAT_SIZE;
+        }
+      } else {
+        literals[numLiterals++] = value;
+        if (numLiterals == MAX_LITERAL_SIZE) {
+          writeValues();
+        }
+      }
+    }
+  }
+
+  public void getPosition(PositionRecorder recorder) throws IOException {
+    output.getPosition(recorder);
+    recorder.addPosition(numLiterals);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9c7a78ee/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
new file mode 100644
index 0000000..f129c86
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
+/**
+ * A reader that reads a sequence of integers.
+ * */
+public class RunLengthIntegerReader implements IntegerReader {
+  private InStream input;
+  private final boolean signed;
+  private final long[] literals =
+    new long[RunLengthIntegerWriter.MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private int delta = 0;
+  private int used = 0;
+  private boolean repeat = false;
+  private SerializationUtils utils;
+
+  public RunLengthIntegerReader(InStream input, boolean signed) throws IOException {
+    this.input = input;
+    this.signed = signed;
+    this.utils = new SerializationUtils();
+  }
+
+  private void readValues(boolean ignoreEof) throws IOException {
+    int control = input.read();
+    if (control == -1) {
+      if (!ignoreEof) {
+        throw new EOFException("Read past end of RLE integer from " + input);
+      }
+      used = numLiterals = 0;
+      return;
+    } else if (control < 0x80) {
+      numLiterals = control + RunLengthIntegerWriter.MIN_REPEAT_SIZE;
+      used = 0;
+      repeat = true;
+      delta = input.read();
+      if (delta == -1) {
+        throw new EOFException("End of stream in RLE Integer from " + input);
+      }
+      // convert from 0 to 255 to -128 to 127 by converting to a signed byte
+      delta = (byte) (0 + delta);
+      if (signed) {
+        literals[0] = utils.readVslong(input);
+      } else {
+        literals[0] = utils.readVulong(input);
+      }
+    } else {
+      repeat = false;
+      numLiterals = 0x100 - control;
+      used = 0;
+      for(int i=0; i < numLiterals; ++i) {
+        if (signed) {
+          literals[i] = utils.readVslong(input);
+        } else {
+          literals[i] = utils.readVulong(input);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return used != numLiterals || input.available() > 0;
+  }
+
+  @Override
+  public long next() throws IOException {
+    long result;
+    if (used == numLiterals) {
+      readValues(false);
+    }
+    if (repeat) {
+      result = literals[0] + (used++) * delta;
+    } else {
+      result = literals[used++];
+    }
+    return result;
+  }
+
+  @Override
+  public void nextVector(LongColumnVector previous, long previousLen) throws IOException {
+    previous.isRepeating = true;
+    for (int i = 0; i < previousLen; i++) {
+      if (!previous.isNull[i]) {
+        previous.vector[i] = next();
+      } else {
+        // The default value of null for int type in vectorized
+        // processing is 1, so set that if the value is null
+        previous.vector[i] = 1;
+      }
+
+      // The default value for nulls in Vectorization for int types is 1
+      // and given that non null value can also be 1, we need to check for isNull also
+      // when determining the isRepeating flag.
+      if (previous.isRepeating
+          && i > 0
+          && (previous.vector[i - 1] != previous.vector[i] || previous.isNull[i - 1] != previous.isNull[i])) {
+        previous.isRepeating = false;
+      }
+    }
+  }
+
+  @Override
+  public void setInStream(InStream data) {
+    input = data;
+  }
+
+  @Override
+  public void seek(PositionProvider index) throws IOException {
+    input.seek(index);
+    int consumed = (int) index.getNext();
+    if (consumed != 0) {
+      // a loop is required for cases where we break the run into two parts
+      while (consumed > 0) {
+        readValues(false);
+        used = consumed;
+        consumed -= numLiterals;
+      }
+    } else {
+      used = 0;
+      numLiterals = 0;
+    }
+  }
+
+  @Override
+  public void skip(long numValues) throws IOException {
+    while (numValues > 0) {
+      if (used == numLiterals) {
+        readValues(false);
+      }
+      long consume = Math.min(numValues, numLiterals - used);
+      used += consume;
+      numValues -= consume;
+    }
+  }
+}