You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/05/13 19:50:48 UTC

[19/23] orc git commit: ORC-1 Import of ORC code from Hive. (omalley reviewed by prasanthj)

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
new file mode 100644
index 0000000..bb73d53
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -0,0 +1,107 @@
+package org.apache.orc.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionKind;
+
+import javax.annotation.Nullable;
+
+public final class DataReaderProperties {
+
+  private final FileSystem fileSystem;
+  private final Path path;
+  private final CompressionKind compression;
+  private final boolean zeroCopy;
+  private final int typeCount;
+  private final int bufferSize;
+
+  private DataReaderProperties(Builder builder) {
+    this.fileSystem = builder.fileSystem;
+    this.path = builder.path;
+    this.compression = builder.compression;
+    this.zeroCopy = builder.zeroCopy;
+    this.typeCount = builder.typeCount;
+    this.bufferSize = builder.bufferSize;
+  }
+
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  public CompressionKind getCompression() {
+    return compression;
+  }
+
+  public boolean getZeroCopy() {
+    return zeroCopy;
+  }
+
+  public int getTypeCount() {
+    return typeCount;
+  }
+
+  public int getBufferSize() {
+    return bufferSize;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private FileSystem fileSystem;
+    private Path path;
+    private CompressionKind compression;
+    private boolean zeroCopy;
+    private int typeCount;
+    private int bufferSize;
+
+    private Builder() {
+
+    }
+
+    public Builder withFileSystem(FileSystem fileSystem) {
+      this.fileSystem = fileSystem;
+      return this;
+    }
+
+    public Builder withPath(Path path) {
+      this.path = path;
+      return this;
+    }
+
+    public Builder withCompression(CompressionKind value) {
+      this.compression = value;
+      return this;
+    }
+
+    public Builder withZeroCopy(boolean zeroCopy) {
+      this.zeroCopy = zeroCopy;
+      return this;
+    }
+
+    public Builder withTypeCount(int value) {
+      this.typeCount = value;
+      return this;
+    }
+
+    public Builder withBufferSize(int value) {
+      this.bufferSize = value;
+      return this;
+    }
+
+    public DataReaderProperties build() {
+      Preconditions.checkNotNull(fileSystem);
+      Preconditions.checkNotNull(path);
+
+      return new DataReaderProperties(this);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java b/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
new file mode 100644
index 0000000..7e0110d
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import org.apache.orc.CompressionCodec;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface DirectDecompressionCodec extends CompressionCodec {
+  public boolean isAvailable();
+  public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java b/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java
new file mode 100644
index 0000000..986c2ac
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * A class that is a growable array of bytes. Growth is managed in terms of
+ * chunks that are allocated when needed.
+ */
+public final class DynamicByteArray {
+  static final int DEFAULT_CHUNKSIZE = 32 * 1024;
+  static final int DEFAULT_NUM_CHUNKS = 128;
+
+  private final int chunkSize;        // our allocation sizes
+  private byte[][] data;              // the real data
+  private int length;                 // max set element index +1
+  private int initializedChunks = 0;  // the number of chunks created
+
+  public DynamicByteArray() {
+    this(DEFAULT_NUM_CHUNKS, DEFAULT_CHUNKSIZE);
+  }
+
+  public DynamicByteArray(int numChunks, int chunkSize) {
+    if (chunkSize == 0) {
+      throw new IllegalArgumentException("bad chunksize");
+    }
+    this.chunkSize = chunkSize;
+    data = new byte[numChunks][];
+  }
+
+  /**
+   * Ensure that the given index is valid.
+   */
+  private void grow(int chunkIndex) {
+    if (chunkIndex >= initializedChunks) {
+      if (chunkIndex >= data.length) {
+        int newSize = Math.max(chunkIndex + 1, 2 * data.length);
+        byte[][] newChunk = new byte[newSize][];
+        System.arraycopy(data, 0, newChunk, 0, data.length);
+        data = newChunk;
+      }
+      for(int i=initializedChunks; i <= chunkIndex; ++i) {
+        data[i] = new byte[chunkSize];
+      }
+      initializedChunks = chunkIndex + 1;
+    }
+  }
+
+  public byte get(int index) {
+    if (index >= length) {
+      throw new IndexOutOfBoundsException("Index " + index +
+                                            " is outside of 0.." +
+                                            (length - 1));
+    }
+    int i = index / chunkSize;
+    int j = index % chunkSize;
+    return data[i][j];
+  }
+
+  public void set(int index, byte value) {
+    int i = index / chunkSize;
+    int j = index % chunkSize;
+    grow(i);
+    if (index >= length) {
+      length = index + 1;
+    }
+    data[i][j] = value;
+  }
+
+  public int add(byte value) {
+    int i = length / chunkSize;
+    int j = length % chunkSize;
+    grow(i);
+    data[i][j] = value;
+    int result = length;
+    length += 1;
+    return result;
+  }
+
+  /**
+   * Copy a slice of a byte array into our buffer.
+   * @param value the array to copy from
+   * @param valueOffset the first location to copy from value
+   * @param valueLength the number of bytes to copy from value
+   * @return the offset of the start of the value
+   */
+  public int add(byte[] value, int valueOffset, int valueLength) {
+    int i = length / chunkSize;
+    int j = length % chunkSize;
+    grow((length + valueLength) / chunkSize);
+    int remaining = valueLength;
+    while (remaining > 0) {
+      int size = Math.min(remaining, chunkSize - j);
+      System.arraycopy(value, valueOffset, data[i], j, size);
+      remaining -= size;
+      valueOffset += size;
+      i += 1;
+      j = 0;
+    }
+    int result = length;
+    length += valueLength;
+    return result;
+  }
+
+  /**
+   * Read the entire stream into this array.
+   * @param in the stream to read from
+   * @throws IOException
+   */
+  public void readAll(InputStream in) throws IOException {
+    int currentChunk = length / chunkSize;
+    int currentOffset = length % chunkSize;
+    grow(currentChunk);
+    int currentLength = in.read(data[currentChunk], currentOffset,
+      chunkSize - currentOffset);
+    while (currentLength > 0) {
+      length += currentLength;
+      currentOffset = length % chunkSize;
+      if (currentOffset == 0) {
+        currentChunk = length / chunkSize;
+        grow(currentChunk);
+      }
+      currentLength = in.read(data[currentChunk], currentOffset,
+        chunkSize - currentOffset);
+    }
+  }
+
+  /**
+   * Byte compare a set of bytes against the bytes in this dynamic array.
+   * @param other source of the other bytes
+   * @param otherOffset start offset in the other array
+   * @param otherLength number of bytes in the other array
+   * @param ourOffset the offset in our array
+   * @param ourLength the number of bytes in our array
+   * @return negative for less, 0 for equal, positive for greater
+   */
+  public int compare(byte[] other, int otherOffset, int otherLength,
+                     int ourOffset, int ourLength) {
+    int currentChunk = ourOffset / chunkSize;
+    int currentOffset = ourOffset % chunkSize;
+    int maxLength = Math.min(otherLength, ourLength);
+    while (maxLength > 0 &&
+      other[otherOffset] == data[currentChunk][currentOffset]) {
+      otherOffset += 1;
+      currentOffset += 1;
+      if (currentOffset == chunkSize) {
+        currentChunk += 1;
+        currentOffset = 0;
+      }
+      maxLength -= 1;
+    }
+    if (maxLength == 0) {
+      return otherLength - ourLength;
+    }
+    int otherByte = 0xff & other[otherOffset];
+    int ourByte = 0xff & data[currentChunk][currentOffset];
+    return otherByte > ourByte ? 1 : -1;
+  }
+
+  /**
+   * Get the size of the array.
+   * @return the number of bytes in the array
+   */
+  public int size() {
+    return length;
+  }
+
+  /**
+   * Clear the array to its original pristine state.
+   */
+  public void clear() {
+    length = 0;
+    for(int i=0; i < data.length; ++i) {
+      data[i] = null;
+    }
+    initializedChunks = 0;
+  }
+
+  /**
+   * Set a text value from the bytes in this dynamic array.
+   * @param result the value to set
+   * @param offset the start of the bytes to copy
+   * @param length the number of bytes to copy
+   */
+  public void setText(Text result, int offset, int length) {
+    result.clear();
+    int currentChunk = offset / chunkSize;
+    int currentOffset = offset % chunkSize;
+    int currentLength = Math.min(length, chunkSize - currentOffset);
+    while (length > 0) {
+      result.append(data[currentChunk], currentOffset, currentLength);
+      length -= currentLength;
+      currentChunk += 1;
+      currentOffset = 0;
+      currentLength = Math.min(length, chunkSize - currentOffset);
+    }
+  }
+
+  /**
+   * Write out a range of this dynamic array to an output stream.
+   * @param out the stream to write to
+   * @param offset the first offset to write
+   * @param length the number of bytes to write
+   * @throws IOException
+   */
+  public void write(OutputStream out, int offset,
+                    int length) throws IOException {
+    int currentChunk = offset / chunkSize;
+    int currentOffset = offset % chunkSize;
+    while (length > 0) {
+      int currentLength = Math.min(length, chunkSize - currentOffset);
+      out.write(data[currentChunk], currentOffset, currentLength);
+      length -= currentLength;
+      currentChunk += 1;
+      currentOffset = 0;
+    }
+  }
+
+  @Override
+  public String toString() {
+    int i;
+    StringBuilder sb = new StringBuilder(length * 3);
+
+    sb.append('{');
+    int l = length - 1;
+    for (i=0; i<l; i++) {
+      sb.append(Integer.toHexString(get(i)));
+      sb.append(',');
+    }
+    sb.append(get(i));
+    sb.append('}');
+
+    return sb.toString();
+  }
+
+  public void setByteBuffer(ByteBuffer result, int offset, int length) {
+    result.clear();
+    int currentChunk = offset / chunkSize;
+    int currentOffset = offset % chunkSize;
+    int currentLength = Math.min(length, chunkSize - currentOffset);
+    while (length > 0) {
+      result.put(data[currentChunk], currentOffset, currentLength);
+      length -= currentLength;
+      currentChunk += 1;
+      currentOffset = 0;
+      currentLength = Math.min(length, chunkSize - currentOffset);
+    }
+  }
+
+  /**
+   * Gets all the bytes of the array.
+   *
+   * @return Bytes of the array
+   */
+  public byte[] get() {
+    byte[] result = null;
+    if (length > 0) {
+      int currentChunk = 0;
+      int currentOffset = 0;
+      int currentLength = Math.min(length, chunkSize);
+      int destOffset = 0;
+      result = new byte[length];
+      int totalLength = length;
+      while (totalLength > 0) {
+        System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
+        destOffset += currentLength;
+        totalLength -= currentLength;
+        currentChunk += 1;
+        currentOffset = 0;
+        currentLength = Math.min(totalLength, chunkSize - currentOffset);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Get the size of the buffers.
+   */
+  public long getSizeInBytes() {
+    return initializedChunks * chunkSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java b/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java
new file mode 100644
index 0000000..3b2884b
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+/**
+ * Dynamic int array that uses primitive types and chunks to avoid copying
+ * large number of integers when it resizes.
+ *
+ * The motivation for this class is memory optimization, i.e. space efficient
+ * storage of potentially huge arrays without good a-priori size guesses.
+ *
+ * The API of this class is between a primitive array and a AbstractList. It's
+ * not a Collection implementation because it handles primitive types, but the
+ * API could be extended to support iterators and the like.
+ *
+ * NOTE: Like standard Collection implementations/arrays, this class is not
+ * synchronized.
+ */
+public final class DynamicIntArray {
+  static final int DEFAULT_CHUNKSIZE = 8 * 1024;
+  static final int INIT_CHUNKS = 128;
+
+  private final int chunkSize;       // our allocation size
+  private int[][] data;              // the real data
+  private int length;                // max set element index +1
+  private int initializedChunks = 0; // the number of created chunks
+
+  public DynamicIntArray() {
+    this(DEFAULT_CHUNKSIZE);
+  }
+
+  public DynamicIntArray(int chunkSize) {
+    this.chunkSize = chunkSize;
+
+    data = new int[INIT_CHUNKS][];
+  }
+
+  /**
+   * Ensure that the given index is valid.
+   */
+  private void grow(int chunkIndex) {
+    if (chunkIndex >= initializedChunks) {
+      if (chunkIndex >= data.length) {
+        int newSize = Math.max(chunkIndex + 1, 2 * data.length);
+        int[][] newChunk = new int[newSize][];
+        System.arraycopy(data, 0, newChunk, 0, data.length);
+        data = newChunk;
+      }
+      for (int i=initializedChunks; i <= chunkIndex; ++i) {
+        data[i] = new int[chunkSize];
+      }
+      initializedChunks = chunkIndex + 1;
+    }
+  }
+
+  public int get(int index) {
+    if (index >= length) {
+      throw new IndexOutOfBoundsException("Index " + index +
+                                            " is outside of 0.." +
+                                            (length - 1));
+    }
+    int i = index / chunkSize;
+    int j = index % chunkSize;
+    return data[i][j];
+  }
+
+  public void set(int index, int value) {
+    int i = index / chunkSize;
+    int j = index % chunkSize;
+    grow(i);
+    if (index >= length) {
+      length = index + 1;
+    }
+    data[i][j] = value;
+  }
+
+  public void increment(int index, int value) {
+    int i = index / chunkSize;
+    int j = index % chunkSize;
+    grow(i);
+    if (index >= length) {
+      length = index + 1;
+    }
+    data[i][j] += value;
+  }
+
+  public void add(int value) {
+    int i = length / chunkSize;
+    int j = length % chunkSize;
+    grow(i);
+    data[i][j] = value;
+    length += 1;
+  }
+
+  public int size() {
+    return length;
+  }
+
+  public void clear() {
+    length = 0;
+    for(int i=0; i < data.length; ++i) {
+      data[i] = null;
+    }
+    initializedChunks = 0;
+  }
+
+  public String toString() {
+    int i;
+    StringBuilder sb = new StringBuilder(length * 4);
+
+    sb.append('{');
+    int l = length - 1;
+    for (i=0; i<l; i++) {
+      sb.append(get(i));
+      sb.append(',');
+    }
+    sb.append(get(i));
+    sb.append('}');
+
+    return sb.toString();
+  }
+
+  public int getSizeInBytes() {
+    return 4 * initializedChunks * chunkSize;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/HadoopShims.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShims.java b/java/core/src/java/org/apache/orc/impl/HadoopShims.java
new file mode 100644
index 0000000..ef7d70f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/HadoopShims.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.VersionInfo;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public interface HadoopShims {
+
+  enum DirectCompressionType {
+    NONE,
+    ZLIB_NOHEADER,
+    ZLIB,
+    SNAPPY,
+  }
+
+  interface DirectDecompressor {
+    void decompress(ByteBuffer var1, ByteBuffer var2) throws IOException;
+  }
+
+  /**
+   * Get a direct decompressor codec, if it is available
+   * @param codec
+   * @return
+   */
+  DirectDecompressor getDirectDecompressor(DirectCompressionType codec);
+
+  /**
+   * a hadoop.io ByteBufferPool shim.
+   */
+  public interface ByteBufferPoolShim {
+    /**
+     * Get a new ByteBuffer from the pool.  The pool can provide this from
+     * removing a buffer from its internal cache, or by allocating a
+     * new buffer.
+     *
+     * @param direct     Whether the buffer should be direct.
+     * @param length     The minimum length the buffer will have.
+     * @return           A new ByteBuffer. Its capacity can be less
+     *                   than what was requested, but must be at
+     *                   least 1 byte.
+     */
+    ByteBuffer getBuffer(boolean direct, int length);
+
+    /**
+     * Release a buffer back to the pool.
+     * The pool may choose to put this buffer into its cache/free it.
+     *
+     * @param buffer    a direct bytebuffer
+     */
+    void putBuffer(ByteBuffer buffer);
+  }
+
+  /**
+   * Provides an HDFS ZeroCopyReader shim.
+   * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to)
+   * @param in ByteBufferPoolShim to allocate fallback buffers with
+   *
+   * @return returns null if not supported
+   */
+  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException;
+
+  public interface ZeroCopyReaderShim extends Closeable {
+    /**
+     * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer.
+     * Also move the in stream by that amount. The data read can be small than maxLength.
+     *
+     * @return ByteBuffer read from the stream,
+     */
+    public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException;
+    /**
+     * Release a ByteBuffer obtained from a read on the
+     * Also move the in stream by that amount. The data read can be small than maxLength.
+     *
+     */
+    public void releaseBuffer(ByteBuffer buffer);
+
+    /**
+     * Close the underlying stream.
+     * @throws IOException
+     */
+    public void close() throws IOException;
+  }
+  /**
+   * Read data into a Text object in the fastest way possible
+   */
+  public interface TextReaderShim {
+    /**
+     * @param txt
+     * @param size
+     * @return bytes read
+     * @throws IOException
+     */
+    void read(Text txt, int size) throws IOException;
+  }
+
+  /**
+   * Wrap a TextReaderShim around an input stream. The reader shim will not
+   * buffer any reads from the underlying stream and will only consume bytes
+   * which are required for TextReaderShim.read() input.
+   */
+  public TextReaderShim getTextReaderShim(InputStream input) throws IOException;
+
+  class Factory {
+    private static HadoopShims SHIMS = null;
+
+    public static synchronized HadoopShims get() {
+      if (SHIMS == null) {
+        String[] versionParts = VersionInfo.getVersion().split("[.]");
+        int major = Integer.parseInt(versionParts[0]);
+        int minor = Integer.parseInt(versionParts[1]);
+        if (major < 2 || (major == 2 && minor < 3)) {
+          SHIMS = new HadoopShims_2_2();
+        } else {
+          SHIMS = new HadoopShimsCurrent();
+        }
+      }
+      return SHIMS;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
new file mode 100644
index 0000000..5c53f74
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Shims for recent versions of Hadoop
+ */
+public class HadoopShimsCurrent implements HadoopShims {
+
+  private static class DirectDecompressWrapper implements DirectDecompressor {
+    private final org.apache.hadoop.io.compress.DirectDecompressor root;
+
+    DirectDecompressWrapper(org.apache.hadoop.io.compress.DirectDecompressor root) {
+      this.root = root;
+    }
+
+    public void decompress(ByteBuffer input,
+                           ByteBuffer output) throws IOException {
+      root.decompress(input, output);
+    }
+  }
+
+  public DirectDecompressor getDirectDecompressor(
+      DirectCompressionType codec) {
+    switch (codec) {
+      case ZLIB:
+        return new DirectDecompressWrapper
+            (new ZlibDecompressor.ZlibDirectDecompressor());
+      case ZLIB_NOHEADER:
+        return new DirectDecompressWrapper
+            (new ZlibDecompressor.ZlibDirectDecompressor
+                (ZlibDecompressor.CompressionHeader.NO_HEADER, 0));
+      case SNAPPY:
+        return new DirectDecompressWrapper
+            (new SnappyDecompressor.SnappyDirectDecompressor());
+      default:
+        return null;
+    }
+  }
+
+  @Override
+  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+                                              ByteBufferPoolShim pool
+                                              ) throws IOException {
+    return ZeroCopyShims.getZeroCopyReader(in, pool);
+  }
+
+  private final class FastTextReaderShim implements TextReaderShim {
+    private final DataInputStream din;
+
+    public FastTextReaderShim(InputStream in) {
+      this.din = new DataInputStream(in);
+    }
+
+    @Override
+    public void read(Text txt, int len) throws IOException {
+      txt.readWithKnownLength(din, len);
+    }
+  }
+
+  @Override
+  public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
+    return new FastTextReaderShim(in);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java b/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java
new file mode 100644
index 0000000..3f65e74
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+
+/**
+ * Shims for versions of Hadoop up to and including 2.2.x
+ */
+public class HadoopShims_2_2 implements HadoopShims {
+
+  final boolean zeroCopy;
+  final boolean fastRead;
+
+  HadoopShims_2_2() {
+    boolean zcr = false;
+    try {
+      Class.forName("org.apache.hadoop.fs.CacheFlag", false,
+        HadoopShims_2_2.class.getClassLoader());
+      zcr = true;
+    } catch (ClassNotFoundException ce) {
+    }
+    zeroCopy = zcr;
+    boolean fastRead = false;
+    if (zcr) {
+      for (Method m : Text.class.getMethods()) {
+        if ("readWithKnownLength".equals(m.getName())) {
+          fastRead = true;
+        }
+      }
+    }
+    this.fastRead = fastRead;
+  }
+
+  public DirectDecompressor getDirectDecompressor(
+      DirectCompressionType codec) {
+    return null;
+  }
+
+  @Override
+  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+                                              ByteBufferPoolShim pool
+                                              ) throws IOException {
+    if(zeroCopy) {
+      return ZeroCopyShims.getZeroCopyReader(in, pool);
+    }
+    /* not supported */
+    return null;
+  }
+
+  private final class BasicTextReaderShim implements TextReaderShim {
+    private final InputStream in;
+
+    public BasicTextReaderShim(InputStream in) {
+      this.in = in;
+    }
+
+    @Override
+    public void read(Text txt, int len) throws IOException {
+      int offset = 0;
+      byte[] bytes = new byte[len];
+      while (len > 0) {
+        int written = in.read(bytes, offset, len);
+        if (written < 0) {
+          throw new EOFException("Can't finish read from " + in + " read "
+              + (offset) + " bytes out of " + bytes.length);
+        }
+        len -= written;
+        offset += written;
+      }
+      txt.set(bytes);
+    }
+  }
+
+  @Override
+  public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
+    return new BasicTextReaderShim(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
new file mode 100644
index 0000000..851f645
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.orc.CompressionCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.common.io.DiskRange;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.CodedInputStream;
+
+public abstract class InStream extends InputStream {
+
+  private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
+  public static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
+
+  protected final String name;
+  protected long length;
+
+  public InStream(String name, long length) {
+    this.name = name;
+    this.length = length;
+  }
+
+  public String getStreamName() {
+    return name;
+  }
+
+  public long getStreamLength() {
+    return length;
+  }
+
+  @Override
+  public abstract void close();
+
+  public static class UncompressedStream extends InStream {
+    private List<DiskRange> bytes;
+    private long length;
+    protected long currentOffset;
+    private ByteBuffer range;
+    private int currentRange;
+
+    public UncompressedStream(String name, List<DiskRange> input, long length) {
+      super(name, length);
+      reset(input, length);
+    }
+
+    protected void reset(List<DiskRange> input, long length) {
+      this.bytes = input;
+      this.length = length;
+      currentRange = 0;
+      currentOffset = 0;
+      range = null;
+    }
+
+    @Override
+    public int read() {
+      if (range == null || range.remaining() == 0) {
+        if (currentOffset == length) {
+          return -1;
+        }
+        seek(currentOffset);
+      }
+      currentOffset += 1;
+      return 0xff & range.get();
+    }
+
+    @Override
+    public int read(byte[] data, int offset, int length) {
+      if (range == null || range.remaining() == 0) {
+        if (currentOffset == this.length) {
+          return -1;
+        }
+        seek(currentOffset);
+      }
+      int actualLength = Math.min(length, range.remaining());
+      range.get(data, offset, actualLength);
+      currentOffset += actualLength;
+      return actualLength;
+    }
+
+    @Override
+    public int available() {
+      if (range != null && range.remaining() > 0) {
+        return range.remaining();
+      }
+      return (int) (length - currentOffset);
+    }
+
+    @Override
+    public void close() {
+      currentRange = bytes.size();
+      currentOffset = length;
+      // explicit de-ref of bytes[]
+      bytes.clear();
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      seek(index.getNext());
+    }
+
+    public void seek(long desired) {
+      if (desired == 0 && bytes.isEmpty()) {
+        logEmptySeek(name);
+        return;
+      }
+      int i = 0;
+      for (DiskRange curRange : bytes) {
+        if (desired == 0 && curRange.getData().remaining() == 0) {
+          logEmptySeek(name);
+          return;
+        }
+        if (curRange.getOffset() <= desired &&
+            (desired - curRange.getOffset()) < curRange.getLength()) {
+          currentOffset = desired;
+          currentRange = i;
+          this.range = curRange.getData().duplicate();
+          int pos = range.position();
+          pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
+          this.range.position(pos);
+          return;
+        }
+        ++i;
+      }
+      // if they are seeking to the precise end, go ahead and let them go there
+      int segments = bytes.size();
+      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+        currentOffset = desired;
+        currentRange = segments - 1;
+        DiskRange curRange = bytes.get(currentRange);
+        this.range = curRange.getData().duplicate();
+        int pos = range.position();
+        pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
+        this.range.position(pos);
+        return;
+      }
+      throw new IllegalArgumentException("Seek in " + name + " to " +
+        desired + " is outside of the data");
+    }
+
+    @Override
+    public String toString() {
+      return "uncompressed stream " + name + " position: " + currentOffset +
+          " length: " + length + " range: " + currentRange +
+          " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
+    }
+  }
+
+  private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
+    // TODO: use the same pool as the ORC readers
+    if (isDirect) {
+      return ByteBuffer.allocateDirect(size);
+    } else {
+      return ByteBuffer.allocate(size);
+    }
+  }
+
+  private static class CompressedStream extends InStream {
+    private final List<DiskRange> bytes;
+    private final int bufferSize;
+    private ByteBuffer uncompressed;
+    private final CompressionCodec codec;
+    private ByteBuffer compressed;
+    private long currentOffset;
+    private int currentRange;
+    private boolean isUncompressedOriginal;
+
+    public CompressedStream(String name, List<DiskRange> input, long length,
+                            CompressionCodec codec, int bufferSize) {
+      super(name, length);
+      this.bytes = input;
+      this.codec = codec;
+      this.bufferSize = bufferSize;
+      currentOffset = 0;
+      currentRange = 0;
+    }
+
+    private void allocateForUncompressed(int size, boolean isDirect) {
+      uncompressed = allocateBuffer(size, isDirect);
+    }
+
+    private void readHeader() throws IOException {
+      if (compressed == null || compressed.remaining() <= 0) {
+        seek(currentOffset);
+      }
+      if (compressed.remaining() > OutStream.HEADER_SIZE) {
+        int b0 = compressed.get() & 0xff;
+        int b1 = compressed.get() & 0xff;
+        int b2 = compressed.get() & 0xff;
+        boolean isOriginal = (b0 & 0x01) == 1;
+        int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+
+        if (chunkLength > bufferSize) {
+          throw new IllegalArgumentException("Buffer size too small. size = " +
+              bufferSize + " needed = " + chunkLength);
+        }
+        // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
+        assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
+        currentOffset += OutStream.HEADER_SIZE;
+
+        ByteBuffer slice = this.slice(chunkLength);
+
+        if (isOriginal) {
+          uncompressed = slice;
+          isUncompressedOriginal = true;
+        } else {
+          if (isUncompressedOriginal) {
+            allocateForUncompressed(bufferSize, slice.isDirect());
+            isUncompressedOriginal = false;
+          } else if (uncompressed == null) {
+            allocateForUncompressed(bufferSize, slice.isDirect());
+          } else {
+            uncompressed.clear();
+          }
+          codec.decompress(slice, uncompressed);
+         }
+      } else {
+        throw new IllegalStateException("Can't read header at " + this);
+      }
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (uncompressed == null || uncompressed.remaining() == 0) {
+        if (currentOffset == length) {
+          return -1;
+        }
+        readHeader();
+      }
+      return 0xff & uncompressed.get();
+    }
+
+    @Override
+    public int read(byte[] data, int offset, int length) throws IOException {
+      if (uncompressed == null || uncompressed.remaining() == 0) {
+        if (currentOffset == this.length) {
+          return -1;
+        }
+        readHeader();
+      }
+      int actualLength = Math.min(length, uncompressed.remaining());
+      uncompressed.get(data, offset, actualLength);
+      return actualLength;
+    }
+
+    @Override
+    public int available() throws IOException {
+      if (uncompressed == null || uncompressed.remaining() == 0) {
+        if (currentOffset == length) {
+          return 0;
+        }
+        readHeader();
+      }
+      return uncompressed.remaining();
+    }
+
+    @Override
+    public void close() {
+      uncompressed = null;
+      compressed = null;
+      currentRange = bytes.size();
+      currentOffset = length;
+      bytes.clear();
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
+      seek(index.getNext());
+      long uncompressedBytes = index.getNext();
+      if (uncompressedBytes != 0) {
+        readHeader();
+        uncompressed.position(uncompressed.position() +
+                              (int) uncompressedBytes);
+      } else if (uncompressed != null) {
+        // mark the uncompressed buffer as done
+        uncompressed.position(uncompressed.limit());
+      }
+    }
+
+    /* slices a read only contiguous buffer of chunkLength */
+    private ByteBuffer slice(int chunkLength) throws IOException {
+      int len = chunkLength;
+      final long oldOffset = currentOffset;
+      ByteBuffer slice;
+      if (compressed.remaining() >= len) {
+        slice = compressed.slice();
+        // simple case
+        slice.limit(len);
+        currentOffset += len;
+        compressed.position(compressed.position() + len);
+        return slice;
+      } else if (currentRange >= (bytes.size() - 1)) {
+        // nothing has been modified yet
+        throw new IOException("EOF in " + this + " while trying to read " +
+            chunkLength + " bytes");
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format(
+            "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)",
+            compressed.remaining(), len));
+      }
+
+      // we need to consolidate 2 or more buffers into 1
+      // first copy out compressed buffers
+      ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+      currentOffset += compressed.remaining();
+      len -= compressed.remaining();
+      copy.put(compressed);
+      ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
+
+      while (len > 0 && iter.hasNext()) {
+        ++currentRange;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
+        }
+        DiskRange range = iter.next();
+        compressed = range.getData().duplicate();
+        if (compressed.remaining() >= len) {
+          slice = compressed.slice();
+          slice.limit(len);
+          copy.put(slice);
+          currentOffset += len;
+          compressed.position(compressed.position() + len);
+          return copy;
+        }
+        currentOffset += compressed.remaining();
+        len -= compressed.remaining();
+        copy.put(compressed);
+      }
+
+      // restore offsets for exception clarity
+      seek(oldOffset);
+      throw new IOException("EOF in " + this + " while trying to read " +
+          chunkLength + " bytes");
+    }
+
+    private void seek(long desired) throws IOException {
+      if (desired == 0 && bytes.isEmpty()) {
+        logEmptySeek(name);
+        return;
+      }
+      int i = 0;
+      for (DiskRange range : bytes) {
+        if (range.getOffset() <= desired && desired < range.getEnd()) {
+          currentRange = i;
+          compressed = range.getData().duplicate();
+          int pos = compressed.position();
+          pos += (int)(desired - range.getOffset());
+          compressed.position(pos);
+          currentOffset = desired;
+          return;
+        }
+        ++i;
+      }
+      // if they are seeking to the precise end, go ahead and let them go there
+      int segments = bytes.size();
+      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+        DiskRange range = bytes.get(segments - 1);
+        currentRange = segments - 1;
+        compressed = range.getData().duplicate();
+        compressed.position(compressed.limit());
+        currentOffset = desired;
+        return;
+      }
+      throw new IOException("Seek outside of data in " + this + " to " + desired);
+    }
+
+    private String rangeString() {
+      StringBuilder builder = new StringBuilder();
+      int i = 0;
+      for (DiskRange range : bytes) {
+        if (i != 0) {
+          builder.append("; ");
+        }
+        builder.append(" range " + i + " = " + range.getOffset()
+            + " to " + (range.getEnd() - range.getOffset()));
+        ++i;
+      }
+      return builder.toString();
+    }
+
+    @Override
+    public String toString() {
+      return "compressed stream " + name + " position: " + currentOffset +
+          " length: " + length + " range: " + currentRange +
+          " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
+          rangeString() +
+          (uncompressed == null ? "" :
+              " uncompressed: " + uncompressed.position() + " to " +
+                  uncompressed.limit());
+    }
+  }
+
+  public abstract void seek(PositionProvider index) throws IOException;
+
+  private static void logEmptySeek(String name) {
+    if (LOG.isWarnEnabled()) {
+      LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream.");
+    }
+  }
+
+  /**
+   * Create an input stream from a list of buffers.
+   * @param streamName the name of the stream
+   * @param buffers the list of ranges of bytes for the stream
+   * @param offsets a list of offsets (the same length as input) that must
+   *                contain the first offset of the each set of bytes in input
+   * @param length the length in bytes of the stream
+   * @param codec the compression codec
+   * @param bufferSize the compression buffer size
+   * @return an input stream
+   * @throws IOException
+   */
+  @VisibleForTesting
+  @Deprecated
+  public static InStream create(String streamName,
+                                ByteBuffer[] buffers,
+                                long[] offsets,
+                                long length,
+                                CompressionCodec codec,
+                                int bufferSize) throws IOException {
+    List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
+    for (int i = 0; i < buffers.length; ++i) {
+      input.add(new BufferChunk(buffers[i], offsets[i]));
+    }
+    return create(streamName, input, length, codec, bufferSize);
+  }
+
+  /**
+   * Create an input stream from a list of disk ranges with data.
+   * @param name the name of the stream
+   * @param input the list of ranges of bytes for the stream; from disk or cache
+   * @param length the length in bytes of the stream
+   * @param codec the compression codec
+   * @param bufferSize the compression buffer size
+   * @return an input stream
+   * @throws IOException
+   */
+  public static InStream create(String name,
+                                List<DiskRange> input,
+                                long length,
+                                CompressionCodec codec,
+                                int bufferSize) throws IOException {
+    if (codec == null) {
+      return new UncompressedStream(name, input, length);
+    } else {
+      return new CompressedStream(name, input, length, codec, bufferSize);
+    }
+  }
+
+  /**
+   * Creates coded input stream (used for protobuf message parsing) with higher message size limit.
+   *
+   * @param name       the name of the stream
+   * @param input      the list of ranges of bytes for the stream; from disk or cache
+   * @param length     the length in bytes of the stream
+   * @param codec      the compression codec
+   * @param bufferSize the compression buffer size
+   * @return coded input stream
+   * @throws IOException
+   */
+  public static CodedInputStream createCodedInputStream(
+      String name,
+      List<DiskRange> input,
+      long length,
+      CompressionCodec codec,
+      int bufferSize) throws IOException {
+    InStream inStream = create(name, input, length, codec, bufferSize);
+    CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream);
+    codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT);
+    return codedInputStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/IntegerReader.java b/java/core/src/java/org/apache/orc/impl/IntegerReader.java
new file mode 100644
index 0000000..3e64d54
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/IntegerReader.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+
+/**
+ * Interface for reading integers.
+ */
+public interface IntegerReader {
+
+  /**
+   * Seek to the position provided by index.
+   * @param index
+   * @throws IOException
+   */
+  void seek(PositionProvider index) throws IOException;
+
+  /**
+   * Skip number of specified rows.
+   * @param numValues
+   * @throws IOException
+   */
+  void skip(long numValues) throws IOException;
+
+  /**
+   * Check if there are any more values left.
+   * @return
+   * @throws IOException
+   */
+  boolean hasNext() throws IOException;
+
+  /**
+   * Return the next available value.
+   * @return
+   * @throws IOException
+   */
+  long next() throws IOException;
+
+  /**
+   * Return the next available vector for values.
+   * @param column the column being read
+   * @param data the vector to read into
+   * @param length the number of numbers to read
+   * @throws IOException
+   */
+   void nextVector(ColumnVector column,
+                   long[] data,
+                   int length
+                   ) throws IOException;
+
+  /**
+   * Return the next available vector for values. Does not change the
+   * value of column.isRepeating.
+   * @param column the column being read
+   * @param data the vector to read into
+   * @param length the number of numbers to read
+   * @throws IOException
+   */
+  void nextVector(ColumnVector column,
+                  int[] data,
+                  int length
+                  ) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/IntegerWriter.java b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
new file mode 100644
index 0000000..419054f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing integers.
+ */
+public interface IntegerWriter {
+
+  /**
+   * Get position from the stream.
+   * @param recorder
+   * @throws IOException
+   */
+  void getPosition(PositionRecorder recorder) throws IOException;
+
+  /**
+   * Write the integer value
+   * @param value
+   * @throws IOException
+   */
+  void write(long value) throws IOException;
+
+  /**
+   * Flush the buffer
+   * @throws IOException
+   */
+  void flush() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/MemoryManager.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/MemoryManager.java b/java/core/src/java/org/apache/orc/impl/MemoryManager.java
new file mode 100644
index 0000000..757c0b4
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/MemoryManager.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.OrcConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implements a memory manager that keeps a global context of how many ORC
+ * writers there are and manages the memory between them. For use cases with
+ * dynamic partitions, it is easy to end up with many writers in the same task.
+ * By managing the size of each allocation, we try to cut down the size of each
+ * allocation and keep the task from running out of memory.
+ * 
+ * This class is not thread safe, but is re-entrant - ensure creation and all
+ * invocations are triggered from the same thread.
+ */
+public class MemoryManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class);
+
+  /**
+   * How often should we check the memory sizes? Measured in rows added
+   * to all of the writers.
+   */
+  private static final int ROWS_BETWEEN_CHECKS = 5000;
+  private final long totalMemoryPool;
+  private final Map<Path, WriterInfo> writerList =
+      new HashMap<Path, WriterInfo>();
+  private long totalAllocation = 0;
+  private double currentScale = 1;
+  private int rowsAddedSinceCheck = 0;
+  private final OwnedLock ownerLock = new OwnedLock();
+
+  @SuppressWarnings("serial")
+  private static class OwnedLock extends ReentrantLock {
+    public Thread getOwner() {
+      return super.getOwner();
+    }
+  }
+
+  private static class WriterInfo {
+    long allocation;
+    Callback callback;
+    WriterInfo(long allocation, Callback callback) {
+      this.allocation = allocation;
+      this.callback = callback;
+    }
+  }
+
+  public interface Callback {
+    /**
+     * The writer needs to check its memory usage
+     * @param newScale the current scale factor for memory allocations
+     * @return true if the writer was over the limit
+     * @throws IOException
+     */
+    boolean checkMemory(double newScale) throws IOException;
+  }
+
+  /**
+   * Create the memory manager.
+   * @param conf use the configuration to find the maximum size of the memory
+   *             pool.
+   */
+  public MemoryManager(Configuration conf) {
+    double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
+    totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
+        getHeapMemoryUsage().getMax() * maxLoad);
+    ownerLock.lock();
+  }
+
+  /**
+   * Light weight thread-safety check for multi-threaded access patterns
+   */
+  private void checkOwner() {
+    if (!ownerLock.isHeldByCurrentThread()) {
+      LOG.warn("Owner thread expected {}, got {}",
+          ownerLock.getOwner(), Thread.currentThread());
+    }
+  }
+
+  /**
+   * Add a new writer's memory allocation to the pool. We use the path
+   * as a unique key to ensure that we don't get duplicates.
+   * @param path the file that is being written
+   * @param requestedAllocation the requested buffer size
+   */
+  public void addWriter(Path path, long requestedAllocation,
+                              Callback callback) throws IOException {
+    checkOwner();
+    WriterInfo oldVal = writerList.get(path);
+    // this should always be null, but we handle the case where the memory
+    // manager wasn't told that a writer wasn't still in use and the task
+    // starts writing to the same path.
+    if (oldVal == null) {
+      oldVal = new WriterInfo(requestedAllocation, callback);
+      writerList.put(path, oldVal);
+      totalAllocation += requestedAllocation;
+    } else {
+      // handle a new writer that is writing to the same path
+      totalAllocation += requestedAllocation - oldVal.allocation;
+      oldVal.allocation = requestedAllocation;
+      oldVal.callback = callback;
+    }
+    updateScale(true);
+  }
+
+  /**
+   * Remove the given writer from the pool.
+   * @param path the file that has been closed
+   */
+  public void removeWriter(Path path) throws IOException {
+    checkOwner();
+    WriterInfo val = writerList.get(path);
+    if (val != null) {
+      writerList.remove(path);
+      totalAllocation -= val.allocation;
+      if (writerList.isEmpty()) {
+        rowsAddedSinceCheck = 0;
+      }
+      updateScale(false);
+    }
+    if(writerList.isEmpty()) {
+      rowsAddedSinceCheck = 0;
+    }
+  }
+
+  /**
+   * Get the total pool size that is available for ORC writers.
+   * @return the number of bytes in the pool
+   */
+  public long getTotalMemoryPool() {
+    return totalMemoryPool;
+  }
+
+  /**
+   * The scaling factor for each allocation to ensure that the pool isn't
+   * oversubscribed.
+   * @return a fraction between 0.0 and 1.0 of the requested size that is
+   * available for each writer.
+   */
+  public double getAllocationScale() {
+    return currentScale;
+  }
+
+  /**
+   * Give the memory manager an opportunity for doing a memory check.
+   * @param rows number of rows added
+   * @throws IOException
+   */
+  public void addedRow(int rows) throws IOException {
+    rowsAddedSinceCheck += rows;
+    if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
+      notifyWriters();
+    }
+  }
+
+  /**
+   * Notify all of the writers that they should check their memory usage.
+   * @throws IOException
+   */
+  public void notifyWriters() throws IOException {
+    checkOwner();
+    LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
+    for(WriterInfo writer: writerList.values()) {
+      boolean flushed = writer.callback.checkMemory(currentScale);
+      if (LOG.isDebugEnabled() && flushed) {
+        LOG.debug("flushed " + writer.toString());
+      }
+    }
+    rowsAddedSinceCheck = 0;
+  }
+
+  /**
+   * Update the currentScale based on the current allocation and pool size.
+   * This also updates the notificationTrigger.
+   * @param isAllocate is this an allocation?
+   */
+  private void updateScale(boolean isAllocate) throws IOException {
+    if (totalAllocation <= totalMemoryPool) {
+      currentScale = 1;
+    } else {
+      currentScale = (double) totalMemoryPool / totalAllocation;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java b/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java
new file mode 100644
index 0000000..72c7f54
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.Reader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+
+public class OrcAcidUtils {
+  public static final String ACID_STATS = "hive.acid.stats";
+  public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
+
+  /**
+   * Get the filename of the ORC ACID side file that contains the lengths
+   * of the intermediate footers.
+   * @param main the main ORC filename
+   * @return the name of the side file
+   */
+  public static Path getSideFile(Path main) {
+    return new Path(main + DELTA_SIDE_FILE_SUFFIX);
+  }
+
+  /**
+   * Read the side file to get the last flush length.
+   * @param fs the file system to use
+   * @param deltaFile the path of the delta file
+   * @return the maximum size of the file to use
+   * @throws IOException
+   */
+  public static long getLastFlushLength(FileSystem fs,
+                                        Path deltaFile) throws IOException {
+    Path lengths = getSideFile(deltaFile);
+    long result = Long.MAX_VALUE;
+    try (FSDataInputStream stream = fs.open(lengths)) {
+      result = -1;
+      while (stream.available() > 0) {
+        result = stream.readLong();
+      }
+      return result;
+    } catch (IOException ioe) {
+      return result;
+    }
+  }
+
+  private static final Charset utf8 = Charset.forName("UTF-8");
+  private static final CharsetDecoder utf8Decoder = utf8.newDecoder();
+
+  public static AcidStats parseAcidStats(Reader reader) {
+    if (reader.hasMetadataValue(ACID_STATS)) {
+      try {
+        ByteBuffer val = reader.getMetadataValue(ACID_STATS).duplicate();
+        return new AcidStats(utf8Decoder.decode(val).toString());
+      } catch (CharacterCodingException e) {
+        throw new IllegalArgumentException("Bad string encoding for " +
+            ACID_STATS, e);
+      }
+    } else {
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/OrcIndex.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcIndex.java b/java/core/src/java/org/apache/orc/impl/OrcIndex.java
new file mode 100644
index 0000000..50a15f2
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/OrcIndex.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.OrcProto;
+
+public final class OrcIndex {
+  OrcProto.RowIndex[] rowGroupIndex;
+  OrcProto.BloomFilterIndex[] bloomFilterIndex;
+
+  public OrcIndex(OrcProto.RowIndex[] rgIndex, OrcProto.BloomFilterIndex[] bfIndex) {
+    this.rowGroupIndex = rgIndex;
+    this.bloomFilterIndex = bfIndex;
+  }
+
+  public OrcProto.RowIndex[] getRowGroupIndex() {
+    return rowGroupIndex;
+  }
+
+  public OrcProto.BloomFilterIndex[] getBloomFilterIndex() {
+    return bloomFilterIndex;
+  }
+
+  public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) {
+    this.rowGroupIndex = rowGroupIndex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java
new file mode 100644
index 0000000..81662cc
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import org.apache.orc.CompressionCodec;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class OutStream extends PositionedOutputStream {
+
+  public interface OutputReceiver {
+    /**
+     * Output the given buffer to the final destination
+     * @param buffer the buffer to output
+     * @throws IOException
+     */
+    void output(ByteBuffer buffer) throws IOException;
+  }
+
+  public static final int HEADER_SIZE = 3;
+  private final String name;
+  private final OutputReceiver receiver;
+  // if enabled the stream will be suppressed when writing stripe
+  private boolean suppress;
+
+  /**
+   * Stores the uncompressed bytes that have been serialized, but not
+   * compressed yet. When this fills, we compress the entire buffer.
+   */
+  private ByteBuffer current = null;
+
+  /**
+   * Stores the compressed bytes until we have a full buffer and then outputs
+   * them to the receiver. If no compression is being done, this (and overflow)
+   * will always be null and the current buffer will be sent directly to the
+   * receiver.
+   */
+  private ByteBuffer compressed = null;
+
+  /**
+   * Since the compressed buffer may start with contents from previous
+   * compression blocks, we allocate an overflow buffer so that the
+   * output of the codec can be split between the two buffers. After the
+   * compressed buffer is sent to the receiver, the overflow buffer becomes
+   * the new compressed buffer.
+   */
+  private ByteBuffer overflow = null;
+  private final int bufferSize;
+  private final CompressionCodec codec;
+  private long compressedBytes = 0;
+  private long uncompressedBytes = 0;
+
+  public OutStream(String name,
+                   int bufferSize,
+                   CompressionCodec codec,
+                   OutputReceiver receiver) throws IOException {
+    this.name = name;
+    this.bufferSize = bufferSize;
+    this.codec = codec;
+    this.receiver = receiver;
+    this.suppress = false;
+  }
+
+  public void clear() throws IOException {
+    flush();
+    suppress = false;
+  }
+
+  /**
+   * Write the length of the compressed bytes. Life is much easier if the
+   * header is constant length, so just use 3 bytes. Considering most of the
+   * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should
+   * be plenty. We also use the low bit for whether it is the original or
+   * compressed bytes.
+   * @param buffer the buffer to write the header to
+   * @param position the position in the buffer to write at
+   * @param val the size in the file
+   * @param original is it uncompressed
+   */
+  private static void writeHeader(ByteBuffer buffer,
+                                  int position,
+                                  int val,
+                                  boolean original) {
+    buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
+    buffer.put(position + 1, (byte) (val >> 7));
+    buffer.put(position + 2, (byte) (val >> 15));
+  }
+
+  private void getNewInputBuffer() throws IOException {
+    if (codec == null) {
+      current = ByteBuffer.allocate(bufferSize);
+    } else {
+      current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+      writeHeader(current, 0, bufferSize, true);
+      current.position(HEADER_SIZE);
+    }
+  }
+
+  /**
+   * Allocate a new output buffer if we are compressing.
+   */
+  private ByteBuffer getNewOutputBuffer() throws IOException {
+    return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+  }
+
+  private void flip() throws IOException {
+    current.limit(current.position());
+    current.position(codec == null ? 0 : HEADER_SIZE);
+  }
+
+  @Override
+  public void write(int i) throws IOException {
+    if (current == null) {
+      getNewInputBuffer();
+    }
+    if (current.remaining() < 1) {
+      spill();
+    }
+    uncompressedBytes += 1;
+    current.put((byte) i);
+  }
+
+  @Override
+  public void write(byte[] bytes, int offset, int length) throws IOException {
+    if (current == null) {
+      getNewInputBuffer();
+    }
+    int remaining = Math.min(current.remaining(), length);
+    current.put(bytes, offset, remaining);
+    uncompressedBytes += remaining;
+    length -= remaining;
+    while (length != 0) {
+      spill();
+      offset += remaining;
+      remaining = Math.min(current.remaining(), length);
+      current.put(bytes, offset, remaining);
+      uncompressedBytes += remaining;
+      length -= remaining;
+    }
+  }
+
+  private void spill() throws java.io.IOException {
+    // if there isn't anything in the current buffer, don't spill
+    if (current == null ||
+        current.position() == (codec == null ? 0 : HEADER_SIZE)) {
+      return;
+    }
+    flip();
+    if (codec == null) {
+      receiver.output(current);
+      getNewInputBuffer();
+    } else {
+      if (compressed == null) {
+        compressed = getNewOutputBuffer();
+      } else if (overflow == null) {
+        overflow = getNewOutputBuffer();
+      }
+      int sizePosn = compressed.position();
+      compressed.position(compressed.position() + HEADER_SIZE);
+      if (codec.compress(current, compressed, overflow)) {
+        uncompressedBytes = 0;
+        // move position back to after the header
+        current.position(HEADER_SIZE);
+        current.limit(current.capacity());
+        // find the total bytes in the chunk
+        int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
+        if (overflow != null) {
+          totalBytes += overflow.position();
+        }
+        compressedBytes += totalBytes + HEADER_SIZE;
+        writeHeader(compressed, sizePosn, totalBytes, false);
+        // if we have less than the next header left, spill it.
+        if (compressed.remaining() < HEADER_SIZE) {
+          compressed.flip();
+          receiver.output(compressed);
+          compressed = overflow;
+          overflow = null;
+        }
+      } else {
+        compressedBytes += uncompressedBytes + HEADER_SIZE;
+        uncompressedBytes = 0;
+        // we are using the original, but need to spill the current
+        // compressed buffer first. So back up to where we started,
+        // flip it and add it to done.
+        if (sizePosn != 0) {
+          compressed.position(sizePosn);
+          compressed.flip();
+          receiver.output(compressed);
+          compressed = null;
+          // if we have an overflow, clear it and make it the new compress
+          // buffer
+          if (overflow != null) {
+            overflow.clear();
+            compressed = overflow;
+            overflow = null;
+          }
+        } else {
+          compressed.clear();
+          if (overflow != null) {
+            overflow.clear();
+          }
+        }
+
+        // now add the current buffer into the done list and get a new one.
+        current.position(0);
+        // update the header with the current length
+        writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
+        receiver.output(current);
+        getNewInputBuffer();
+      }
+    }
+  }
+
+  @Override
+  public void getPosition(PositionRecorder recorder) throws IOException {
+    if (codec == null) {
+      recorder.addPosition(uncompressedBytes);
+    } else {
+      recorder.addPosition(compressedBytes);
+      recorder.addPosition(uncompressedBytes);
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    spill();
+    if (compressed != null && compressed.position() != 0) {
+      compressed.flip();
+      receiver.output(compressed);
+    }
+    compressed = null;
+    uncompressedBytes = 0;
+    compressedBytes = 0;
+    overflow = null;
+    current = null;
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+
+  @Override
+  public long getBufferSize() {
+    long result = 0;
+    if (current != null) {
+      result += current.capacity();
+    }
+    if (compressed != null) {
+      result += compressed.capacity();
+    }
+    if (overflow != null) {
+      result += overflow.capacity();
+    }
+    return result;
+  }
+
+  /**
+   * Set suppress flag
+   */
+  public void suppress() {
+    suppress = true;
+  }
+
+  /**
+   * Returns the state of suppress flag
+   * @return value of suppress flag
+   */
+  public boolean isSuppressed() {
+    return suppress;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/PositionProvider.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PositionProvider.java b/java/core/src/java/org/apache/orc/impl/PositionProvider.java
new file mode 100644
index 0000000..47cf481
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/PositionProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+/**
+ * An interface used for seeking to a row index.
+ */
+public interface PositionProvider {
+  long getNext();
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/PositionRecorder.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PositionRecorder.java b/java/core/src/java/org/apache/orc/impl/PositionRecorder.java
new file mode 100644
index 0000000..1fff760
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/PositionRecorder.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+/**
+ * An interface for recording positions in a stream.
+ */
+public interface PositionRecorder {
+  void addPosition(long offset);
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java b/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java
new file mode 100644
index 0000000..d412939
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class PositionedOutputStream extends OutputStream {
+
+  /**
+   * Record the current position to the recorder.
+   * @param recorder the object that receives the position
+   * @throws IOException
+   */
+  public abstract void getPosition(PositionRecorder recorder
+                                   ) throws IOException;
+
+  /**
+   * Get the memory size currently allocated as buffer associated with this
+   * stream.
+   * @return the number of bytes used by buffers.
+   */
+  public abstract long getBufferSize();
+}