You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2017/02/03 16:38:16 UTC

[17/22] hive git commit: HIVE-14007. Replace hive-orc module with ORC 1.3.1

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
deleted file mode 100644
index 22301e8..0000000
--- a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.orc.CompressionKind;
-
-import javax.annotation.Nullable;
-
-public final class DataReaderProperties {
-
-  private final FileSystem fileSystem;
-  private final Path path;
-  private final CompressionKind compression;
-  private final boolean zeroCopy;
-  private final int typeCount;
-  private final int bufferSize;
-
-  private DataReaderProperties(Builder builder) {
-    this.fileSystem = builder.fileSystem;
-    this.path = builder.path;
-    this.compression = builder.compression;
-    this.zeroCopy = builder.zeroCopy;
-    this.typeCount = builder.typeCount;
-    this.bufferSize = builder.bufferSize;
-  }
-
-  public FileSystem getFileSystem() {
-    return fileSystem;
-  }
-
-  public Path getPath() {
-    return path;
-  }
-
-  public CompressionKind getCompression() {
-    return compression;
-  }
-
-  public boolean getZeroCopy() {
-    return zeroCopy;
-  }
-
-  public int getTypeCount() {
-    return typeCount;
-  }
-
-  public int getBufferSize() {
-    return bufferSize;
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  public static class Builder {
-
-    private FileSystem fileSystem;
-    private Path path;
-    private CompressionKind compression;
-    private boolean zeroCopy;
-    private int typeCount;
-    private int bufferSize;
-
-    private Builder() {
-
-    }
-
-    public Builder withFileSystem(FileSystem fileSystem) {
-      this.fileSystem = fileSystem;
-      return this;
-    }
-
-    public Builder withPath(Path path) {
-      this.path = path;
-      return this;
-    }
-
-    public Builder withCompression(CompressionKind value) {
-      this.compression = value;
-      return this;
-    }
-
-    public Builder withZeroCopy(boolean zeroCopy) {
-      this.zeroCopy = zeroCopy;
-      return this;
-    }
-
-    public Builder withTypeCount(int value) {
-      this.typeCount = value;
-      return this;
-    }
-
-    public Builder withBufferSize(int value) {
-      this.bufferSize = value;
-      return this;
-    }
-
-    public DataReaderProperties build() {
-      Preconditions.checkNotNull(fileSystem);
-      Preconditions.checkNotNull(path);
-
-      return new DataReaderProperties(this);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java b/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
deleted file mode 100644
index 7e0110d..0000000
--- a/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import org.apache.orc.CompressionCodec;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public interface DirectDecompressionCodec extends CompressionCodec {
-  public boolean isAvailable();
-  public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DynamicByteArray.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DynamicByteArray.java b/orc/src/java/org/apache/orc/impl/DynamicByteArray.java
deleted file mode 100644
index 986c2ac..0000000
--- a/orc/src/java/org/apache/orc/impl/DynamicByteArray.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.io.Text;
-
-/**
- * A class that is a growable array of bytes. Growth is managed in terms of
- * chunks that are allocated when needed.
- */
-public final class DynamicByteArray {
-  static final int DEFAULT_CHUNKSIZE = 32 * 1024;
-  static final int DEFAULT_NUM_CHUNKS = 128;
-
-  private final int chunkSize;        // our allocation sizes
-  private byte[][] data;              // the real data
-  private int length;                 // max set element index +1
-  private int initializedChunks = 0;  // the number of chunks created
-
-  public DynamicByteArray() {
-    this(DEFAULT_NUM_CHUNKS, DEFAULT_CHUNKSIZE);
-  }
-
-  public DynamicByteArray(int numChunks, int chunkSize) {
-    if (chunkSize == 0) {
-      throw new IllegalArgumentException("bad chunksize");
-    }
-    this.chunkSize = chunkSize;
-    data = new byte[numChunks][];
-  }
-
-  /**
-   * Ensure that the given index is valid.
-   */
-  private void grow(int chunkIndex) {
-    if (chunkIndex >= initializedChunks) {
-      if (chunkIndex >= data.length) {
-        int newSize = Math.max(chunkIndex + 1, 2 * data.length);
-        byte[][] newChunk = new byte[newSize][];
-        System.arraycopy(data, 0, newChunk, 0, data.length);
-        data = newChunk;
-      }
-      for(int i=initializedChunks; i <= chunkIndex; ++i) {
-        data[i] = new byte[chunkSize];
-      }
-      initializedChunks = chunkIndex + 1;
-    }
-  }
-
-  public byte get(int index) {
-    if (index >= length) {
-      throw new IndexOutOfBoundsException("Index " + index +
-                                            " is outside of 0.." +
-                                            (length - 1));
-    }
-    int i = index / chunkSize;
-    int j = index % chunkSize;
-    return data[i][j];
-  }
-
-  public void set(int index, byte value) {
-    int i = index / chunkSize;
-    int j = index % chunkSize;
-    grow(i);
-    if (index >= length) {
-      length = index + 1;
-    }
-    data[i][j] = value;
-  }
-
-  public int add(byte value) {
-    int i = length / chunkSize;
-    int j = length % chunkSize;
-    grow(i);
-    data[i][j] = value;
-    int result = length;
-    length += 1;
-    return result;
-  }
-
-  /**
-   * Copy a slice of a byte array into our buffer.
-   * @param value the array to copy from
-   * @param valueOffset the first location to copy from value
-   * @param valueLength the number of bytes to copy from value
-   * @return the offset of the start of the value
-   */
-  public int add(byte[] value, int valueOffset, int valueLength) {
-    int i = length / chunkSize;
-    int j = length % chunkSize;
-    grow((length + valueLength) / chunkSize);
-    int remaining = valueLength;
-    while (remaining > 0) {
-      int size = Math.min(remaining, chunkSize - j);
-      System.arraycopy(value, valueOffset, data[i], j, size);
-      remaining -= size;
-      valueOffset += size;
-      i += 1;
-      j = 0;
-    }
-    int result = length;
-    length += valueLength;
-    return result;
-  }
-
-  /**
-   * Read the entire stream into this array.
-   * @param in the stream to read from
-   * @throws IOException
-   */
-  public void readAll(InputStream in) throws IOException {
-    int currentChunk = length / chunkSize;
-    int currentOffset = length % chunkSize;
-    grow(currentChunk);
-    int currentLength = in.read(data[currentChunk], currentOffset,
-      chunkSize - currentOffset);
-    while (currentLength > 0) {
-      length += currentLength;
-      currentOffset = length % chunkSize;
-      if (currentOffset == 0) {
-        currentChunk = length / chunkSize;
-        grow(currentChunk);
-      }
-      currentLength = in.read(data[currentChunk], currentOffset,
-        chunkSize - currentOffset);
-    }
-  }
-
-  /**
-   * Byte compare a set of bytes against the bytes in this dynamic array.
-   * @param other source of the other bytes
-   * @param otherOffset start offset in the other array
-   * @param otherLength number of bytes in the other array
-   * @param ourOffset the offset in our array
-   * @param ourLength the number of bytes in our array
-   * @return negative for less, 0 for equal, positive for greater
-   */
-  public int compare(byte[] other, int otherOffset, int otherLength,
-                     int ourOffset, int ourLength) {
-    int currentChunk = ourOffset / chunkSize;
-    int currentOffset = ourOffset % chunkSize;
-    int maxLength = Math.min(otherLength, ourLength);
-    while (maxLength > 0 &&
-      other[otherOffset] == data[currentChunk][currentOffset]) {
-      otherOffset += 1;
-      currentOffset += 1;
-      if (currentOffset == chunkSize) {
-        currentChunk += 1;
-        currentOffset = 0;
-      }
-      maxLength -= 1;
-    }
-    if (maxLength == 0) {
-      return otherLength - ourLength;
-    }
-    int otherByte = 0xff & other[otherOffset];
-    int ourByte = 0xff & data[currentChunk][currentOffset];
-    return otherByte > ourByte ? 1 : -1;
-  }
-
-  /**
-   * Get the size of the array.
-   * @return the number of bytes in the array
-   */
-  public int size() {
-    return length;
-  }
-
-  /**
-   * Clear the array to its original pristine state.
-   */
-  public void clear() {
-    length = 0;
-    for(int i=0; i < data.length; ++i) {
-      data[i] = null;
-    }
-    initializedChunks = 0;
-  }
-
-  /**
-   * Set a text value from the bytes in this dynamic array.
-   * @param result the value to set
-   * @param offset the start of the bytes to copy
-   * @param length the number of bytes to copy
-   */
-  public void setText(Text result, int offset, int length) {
-    result.clear();
-    int currentChunk = offset / chunkSize;
-    int currentOffset = offset % chunkSize;
-    int currentLength = Math.min(length, chunkSize - currentOffset);
-    while (length > 0) {
-      result.append(data[currentChunk], currentOffset, currentLength);
-      length -= currentLength;
-      currentChunk += 1;
-      currentOffset = 0;
-      currentLength = Math.min(length, chunkSize - currentOffset);
-    }
-  }
-
-  /**
-   * Write out a range of this dynamic array to an output stream.
-   * @param out the stream to write to
-   * @param offset the first offset to write
-   * @param length the number of bytes to write
-   * @throws IOException
-   */
-  public void write(OutputStream out, int offset,
-                    int length) throws IOException {
-    int currentChunk = offset / chunkSize;
-    int currentOffset = offset % chunkSize;
-    while (length > 0) {
-      int currentLength = Math.min(length, chunkSize - currentOffset);
-      out.write(data[currentChunk], currentOffset, currentLength);
-      length -= currentLength;
-      currentChunk += 1;
-      currentOffset = 0;
-    }
-  }
-
-  @Override
-  public String toString() {
-    int i;
-    StringBuilder sb = new StringBuilder(length * 3);
-
-    sb.append('{');
-    int l = length - 1;
-    for (i=0; i<l; i++) {
-      sb.append(Integer.toHexString(get(i)));
-      sb.append(',');
-    }
-    sb.append(get(i));
-    sb.append('}');
-
-    return sb.toString();
-  }
-
-  public void setByteBuffer(ByteBuffer result, int offset, int length) {
-    result.clear();
-    int currentChunk = offset / chunkSize;
-    int currentOffset = offset % chunkSize;
-    int currentLength = Math.min(length, chunkSize - currentOffset);
-    while (length > 0) {
-      result.put(data[currentChunk], currentOffset, currentLength);
-      length -= currentLength;
-      currentChunk += 1;
-      currentOffset = 0;
-      currentLength = Math.min(length, chunkSize - currentOffset);
-    }
-  }
-
-  /**
-   * Gets all the bytes of the array.
-   *
-   * @return Bytes of the array
-   */
-  public byte[] get() {
-    byte[] result = null;
-    if (length > 0) {
-      int currentChunk = 0;
-      int currentOffset = 0;
-      int currentLength = Math.min(length, chunkSize);
-      int destOffset = 0;
-      result = new byte[length];
-      int totalLength = length;
-      while (totalLength > 0) {
-        System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
-        destOffset += currentLength;
-        totalLength -= currentLength;
-        currentChunk += 1;
-        currentOffset = 0;
-        currentLength = Math.min(totalLength, chunkSize - currentOffset);
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Get the size of the buffers.
-   */
-  public long getSizeInBytes() {
-    return initializedChunks * chunkSize;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DynamicIntArray.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DynamicIntArray.java b/orc/src/java/org/apache/orc/impl/DynamicIntArray.java
deleted file mode 100644
index 3b2884b..0000000
--- a/orc/src/java/org/apache/orc/impl/DynamicIntArray.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-/**
- * Dynamic int array that uses primitive types and chunks to avoid copying
- * large number of integers when it resizes.
- *
- * The motivation for this class is memory optimization, i.e. space efficient
- * storage of potentially huge arrays without good a-priori size guesses.
- *
- * The API of this class is between a primitive array and a AbstractList. It's
- * not a Collection implementation because it handles primitive types, but the
- * API could be extended to support iterators and the like.
- *
- * NOTE: Like standard Collection implementations/arrays, this class is not
- * synchronized.
- */
-public final class DynamicIntArray {
-  static final int DEFAULT_CHUNKSIZE = 8 * 1024;
-  static final int INIT_CHUNKS = 128;
-
-  private final int chunkSize;       // our allocation size
-  private int[][] data;              // the real data
-  private int length;                // max set element index +1
-  private int initializedChunks = 0; // the number of created chunks
-
-  public DynamicIntArray() {
-    this(DEFAULT_CHUNKSIZE);
-  }
-
-  public DynamicIntArray(int chunkSize) {
-    this.chunkSize = chunkSize;
-
-    data = new int[INIT_CHUNKS][];
-  }
-
-  /**
-   * Ensure that the given index is valid.
-   */
-  private void grow(int chunkIndex) {
-    if (chunkIndex >= initializedChunks) {
-      if (chunkIndex >= data.length) {
-        int newSize = Math.max(chunkIndex + 1, 2 * data.length);
-        int[][] newChunk = new int[newSize][];
-        System.arraycopy(data, 0, newChunk, 0, data.length);
-        data = newChunk;
-      }
-      for (int i=initializedChunks; i <= chunkIndex; ++i) {
-        data[i] = new int[chunkSize];
-      }
-      initializedChunks = chunkIndex + 1;
-    }
-  }
-
-  public int get(int index) {
-    if (index >= length) {
-      throw new IndexOutOfBoundsException("Index " + index +
-                                            " is outside of 0.." +
-                                            (length - 1));
-    }
-    int i = index / chunkSize;
-    int j = index % chunkSize;
-    return data[i][j];
-  }
-
-  public void set(int index, int value) {
-    int i = index / chunkSize;
-    int j = index % chunkSize;
-    grow(i);
-    if (index >= length) {
-      length = index + 1;
-    }
-    data[i][j] = value;
-  }
-
-  public void increment(int index, int value) {
-    int i = index / chunkSize;
-    int j = index % chunkSize;
-    grow(i);
-    if (index >= length) {
-      length = index + 1;
-    }
-    data[i][j] += value;
-  }
-
-  public void add(int value) {
-    int i = length / chunkSize;
-    int j = length % chunkSize;
-    grow(i);
-    data[i][j] = value;
-    length += 1;
-  }
-
-  public int size() {
-    return length;
-  }
-
-  public void clear() {
-    length = 0;
-    for(int i=0; i < data.length; ++i) {
-      data[i] = null;
-    }
-    initializedChunks = 0;
-  }
-
-  public String toString() {
-    int i;
-    StringBuilder sb = new StringBuilder(length * 4);
-
-    sb.append('{');
-    int l = length - 1;
-    for (i=0; i<l; i++) {
-      sb.append(get(i));
-      sb.append(',');
-    }
-    sb.append(get(i));
-    sb.append('}');
-
-    return sb.toString();
-  }
-
-  public int getSizeInBytes() {
-    return 4 * initializedChunks * chunkSize;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/HadoopShims.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShims.java b/orc/src/java/org/apache/orc/impl/HadoopShims.java
deleted file mode 100644
index ef7d70f..0000000
--- a/orc/src/java/org/apache/orc/impl/HadoopShims.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.VersionInfo;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-public interface HadoopShims {
-
-  enum DirectCompressionType {
-    NONE,
-    ZLIB_NOHEADER,
-    ZLIB,
-    SNAPPY,
-  }
-
-  interface DirectDecompressor {
-    void decompress(ByteBuffer var1, ByteBuffer var2) throws IOException;
-  }
-
-  /**
-   * Get a direct decompressor codec, if it is available
-   * @param codec
-   * @return
-   */
-  DirectDecompressor getDirectDecompressor(DirectCompressionType codec);
-
-  /**
-   * a hadoop.io ByteBufferPool shim.
-   */
-  public interface ByteBufferPoolShim {
-    /**
-     * Get a new ByteBuffer from the pool.  The pool can provide this from
-     * removing a buffer from its internal cache, or by allocating a
-     * new buffer.
-     *
-     * @param direct     Whether the buffer should be direct.
-     * @param length     The minimum length the buffer will have.
-     * @return           A new ByteBuffer. Its capacity can be less
-     *                   than what was requested, but must be at
-     *                   least 1 byte.
-     */
-    ByteBuffer getBuffer(boolean direct, int length);
-
-    /**
-     * Release a buffer back to the pool.
-     * The pool may choose to put this buffer into its cache/free it.
-     *
-     * @param buffer    a direct bytebuffer
-     */
-    void putBuffer(ByteBuffer buffer);
-  }
-
-  /**
-   * Provides an HDFS ZeroCopyReader shim.
-   * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to)
-   * @param in ByteBufferPoolShim to allocate fallback buffers with
-   *
-   * @return returns null if not supported
-   */
-  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException;
-
-  public interface ZeroCopyReaderShim extends Closeable {
-    /**
-     * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer.
-     * Also move the in stream by that amount. The data read can be small than maxLength.
-     *
-     * @return ByteBuffer read from the stream,
-     */
-    public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException;
-    /**
-     * Release a ByteBuffer obtained from a read on the
-     * Also move the in stream by that amount. The data read can be small than maxLength.
-     *
-     */
-    public void releaseBuffer(ByteBuffer buffer);
-
-    /**
-     * Close the underlying stream.
-     * @throws IOException
-     */
-    public void close() throws IOException;
-  }
-  /**
-   * Read data into a Text object in the fastest way possible
-   */
-  public interface TextReaderShim {
-    /**
-     * @param txt
-     * @param size
-     * @return bytes read
-     * @throws IOException
-     */
-    void read(Text txt, int size) throws IOException;
-  }
-
-  /**
-   * Wrap a TextReaderShim around an input stream. The reader shim will not
-   * buffer any reads from the underlying stream and will only consume bytes
-   * which are required for TextReaderShim.read() input.
-   */
-  public TextReaderShim getTextReaderShim(InputStream input) throws IOException;
-
-  class Factory {
-    private static HadoopShims SHIMS = null;
-
-    public static synchronized HadoopShims get() {
-      if (SHIMS == null) {
-        String[] versionParts = VersionInfo.getVersion().split("[.]");
-        int major = Integer.parseInt(versionParts[0]);
-        int minor = Integer.parseInt(versionParts[1]);
-        if (major < 2 || (major == 2 && minor < 3)) {
-          SHIMS = new HadoopShims_2_2();
-        } else {
-          SHIMS = new HadoopShimsCurrent();
-        }
-      }
-      return SHIMS;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
deleted file mode 100644
index 5c53f74..0000000
--- a/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
-import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-/**
- * Shims for recent versions of Hadoop
- */
-public class HadoopShimsCurrent implements HadoopShims {
-
-  private static class DirectDecompressWrapper implements DirectDecompressor {
-    private final org.apache.hadoop.io.compress.DirectDecompressor root;
-
-    DirectDecompressWrapper(org.apache.hadoop.io.compress.DirectDecompressor root) {
-      this.root = root;
-    }
-
-    public void decompress(ByteBuffer input,
-                           ByteBuffer output) throws IOException {
-      root.decompress(input, output);
-    }
-  }
-
-  public DirectDecompressor getDirectDecompressor(
-      DirectCompressionType codec) {
-    switch (codec) {
-      case ZLIB:
-        return new DirectDecompressWrapper
-            (new ZlibDecompressor.ZlibDirectDecompressor());
-      case ZLIB_NOHEADER:
-        return new DirectDecompressWrapper
-            (new ZlibDecompressor.ZlibDirectDecompressor
-                (ZlibDecompressor.CompressionHeader.NO_HEADER, 0));
-      case SNAPPY:
-        return new DirectDecompressWrapper
-            (new SnappyDecompressor.SnappyDirectDecompressor());
-      default:
-        return null;
-    }
-  }
-
-  @Override
-  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
-                                              ByteBufferPoolShim pool
-                                              ) throws IOException {
-    return ZeroCopyShims.getZeroCopyReader(in, pool);
-  }
-
-  private final class FastTextReaderShim implements TextReaderShim {
-    private final DataInputStream din;
-
-    public FastTextReaderShim(InputStream in) {
-      this.din = new DataInputStream(in);
-    }
-
-    @Override
-    public void read(Text txt, int len) throws IOException {
-      txt.readWithKnownLength(din, len);
-    }
-  }
-
-  @Override
-  public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
-    return new FastTextReaderShim(in);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java b/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
deleted file mode 100644
index 3f65e74..0000000
--- a/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.Text;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Method;
-
-/**
- * Shims for versions of Hadoop up to and including 2.2.x
- */
-public class HadoopShims_2_2 implements HadoopShims {
-
-  final boolean zeroCopy;
-  final boolean fastRead;
-
-  HadoopShims_2_2() {
-    boolean zcr = false;
-    try {
-      Class.forName("org.apache.hadoop.fs.CacheFlag", false,
-        HadoopShims_2_2.class.getClassLoader());
-      zcr = true;
-    } catch (ClassNotFoundException ce) {
-    }
-    zeroCopy = zcr;
-    boolean fastRead = false;
-    if (zcr) {
-      for (Method m : Text.class.getMethods()) {
-        if ("readWithKnownLength".equals(m.getName())) {
-          fastRead = true;
-        }
-      }
-    }
-    this.fastRead = fastRead;
-  }
-
-  public DirectDecompressor getDirectDecompressor(
-      DirectCompressionType codec) {
-    return null;
-  }
-
-  @Override
-  public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
-                                              ByteBufferPoolShim pool
-                                              ) throws IOException {
-    if(zeroCopy) {
-      return ZeroCopyShims.getZeroCopyReader(in, pool);
-    }
-    /* not supported */
-    return null;
-  }
-
-  private final class BasicTextReaderShim implements TextReaderShim {
-    private final InputStream in;
-
-    public BasicTextReaderShim(InputStream in) {
-      this.in = in;
-    }
-
-    @Override
-    public void read(Text txt, int len) throws IOException {
-      int offset = 0;
-      byte[] bytes = new byte[len];
-      while (len > 0) {
-        int written = in.read(bytes, offset, len);
-        if (written < 0) {
-          throw new EOFException("Can't finish read from " + in + " read "
-              + (offset) + " bytes out of " + bytes.length);
-        }
-        len -= written;
-        offset += written;
-      }
-      txt.set(bytes);
-    }
-  }
-
-  @Override
-  public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
-    return new BasicTextReaderShim(in);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/InStream.java b/orc/src/java/org/apache/orc/impl/InStream.java
deleted file mode 100644
index 851f645..0000000
--- a/orc/src/java/org/apache/orc/impl/InStream.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-
-import org.apache.orc.CompressionCodec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.io.DiskRange;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.CodedInputStream;
-
-public abstract class InStream extends InputStream {
-
-  private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
-  public static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
-
-  protected final String name;
-  protected long length;
-
-  public InStream(String name, long length) {
-    this.name = name;
-    this.length = length;
-  }
-
-  public String getStreamName() {
-    return name;
-  }
-
-  public long getStreamLength() {
-    return length;
-  }
-
-  @Override
-  public abstract void close();
-
-  public static class UncompressedStream extends InStream {
-    private List<DiskRange> bytes;
-    private long length;
-    protected long currentOffset;
-    private ByteBuffer range;
-    private int currentRange;
-
-    public UncompressedStream(String name, List<DiskRange> input, long length) {
-      super(name, length);
-      reset(input, length);
-    }
-
-    protected void reset(List<DiskRange> input, long length) {
-      this.bytes = input;
-      this.length = length;
-      currentRange = 0;
-      currentOffset = 0;
-      range = null;
-    }
-
-    @Override
-    public int read() {
-      if (range == null || range.remaining() == 0) {
-        if (currentOffset == length) {
-          return -1;
-        }
-        seek(currentOffset);
-      }
-      currentOffset += 1;
-      return 0xff & range.get();
-    }
-
-    @Override
-    public int read(byte[] data, int offset, int length) {
-      if (range == null || range.remaining() == 0) {
-        if (currentOffset == this.length) {
-          return -1;
-        }
-        seek(currentOffset);
-      }
-      int actualLength = Math.min(length, range.remaining());
-      range.get(data, offset, actualLength);
-      currentOffset += actualLength;
-      return actualLength;
-    }
-
-    @Override
-    public int available() {
-      if (range != null && range.remaining() > 0) {
-        return range.remaining();
-      }
-      return (int) (length - currentOffset);
-    }
-
-    @Override
-    public void close() {
-      currentRange = bytes.size();
-      currentOffset = length;
-      // explicit de-ref of bytes[]
-      bytes.clear();
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      seek(index.getNext());
-    }
-
-    public void seek(long desired) {
-      if (desired == 0 && bytes.isEmpty()) {
-        logEmptySeek(name);
-        return;
-      }
-      int i = 0;
-      for (DiskRange curRange : bytes) {
-        if (desired == 0 && curRange.getData().remaining() == 0) {
-          logEmptySeek(name);
-          return;
-        }
-        if (curRange.getOffset() <= desired &&
-            (desired - curRange.getOffset()) < curRange.getLength()) {
-          currentOffset = desired;
-          currentRange = i;
-          this.range = curRange.getData().duplicate();
-          int pos = range.position();
-          pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
-          this.range.position(pos);
-          return;
-        }
-        ++i;
-      }
-      // if they are seeking to the precise end, go ahead and let them go there
-      int segments = bytes.size();
-      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
-        currentOffset = desired;
-        currentRange = segments - 1;
-        DiskRange curRange = bytes.get(currentRange);
-        this.range = curRange.getData().duplicate();
-        int pos = range.position();
-        pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
-        this.range.position(pos);
-        return;
-      }
-      throw new IllegalArgumentException("Seek in " + name + " to " +
-        desired + " is outside of the data");
-    }
-
-    @Override
-    public String toString() {
-      return "uncompressed stream " + name + " position: " + currentOffset +
-          " length: " + length + " range: " + currentRange +
-          " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
-    }
-  }
-
-  private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
-    // TODO: use the same pool as the ORC readers
-    if (isDirect) {
-      return ByteBuffer.allocateDirect(size);
-    } else {
-      return ByteBuffer.allocate(size);
-    }
-  }
-
-  private static class CompressedStream extends InStream {
-    private final List<DiskRange> bytes;
-    private final int bufferSize;
-    private ByteBuffer uncompressed;
-    private final CompressionCodec codec;
-    private ByteBuffer compressed;
-    private long currentOffset;
-    private int currentRange;
-    private boolean isUncompressedOriginal;
-
-    public CompressedStream(String name, List<DiskRange> input, long length,
-                            CompressionCodec codec, int bufferSize) {
-      super(name, length);
-      this.bytes = input;
-      this.codec = codec;
-      this.bufferSize = bufferSize;
-      currentOffset = 0;
-      currentRange = 0;
-    }
-
-    private void allocateForUncompressed(int size, boolean isDirect) {
-      uncompressed = allocateBuffer(size, isDirect);
-    }
-
-    private void readHeader() throws IOException {
-      if (compressed == null || compressed.remaining() <= 0) {
-        seek(currentOffset);
-      }
-      if (compressed.remaining() > OutStream.HEADER_SIZE) {
-        int b0 = compressed.get() & 0xff;
-        int b1 = compressed.get() & 0xff;
-        int b2 = compressed.get() & 0xff;
-        boolean isOriginal = (b0 & 0x01) == 1;
-        int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
-
-        if (chunkLength > bufferSize) {
-          throw new IllegalArgumentException("Buffer size too small. size = " +
-              bufferSize + " needed = " + chunkLength);
-        }
-        // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
-        assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
-        currentOffset += OutStream.HEADER_SIZE;
-
-        ByteBuffer slice = this.slice(chunkLength);
-
-        if (isOriginal) {
-          uncompressed = slice;
-          isUncompressedOriginal = true;
-        } else {
-          if (isUncompressedOriginal) {
-            allocateForUncompressed(bufferSize, slice.isDirect());
-            isUncompressedOriginal = false;
-          } else if (uncompressed == null) {
-            allocateForUncompressed(bufferSize, slice.isDirect());
-          } else {
-            uncompressed.clear();
-          }
-          codec.decompress(slice, uncompressed);
-         }
-      } else {
-        throw new IllegalStateException("Can't read header at " + this);
-      }
-    }
-
-    @Override
-    public int read() throws IOException {
-      if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (currentOffset == length) {
-          return -1;
-        }
-        readHeader();
-      }
-      return 0xff & uncompressed.get();
-    }
-
-    @Override
-    public int read(byte[] data, int offset, int length) throws IOException {
-      if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (currentOffset == this.length) {
-          return -1;
-        }
-        readHeader();
-      }
-      int actualLength = Math.min(length, uncompressed.remaining());
-      uncompressed.get(data, offset, actualLength);
-      return actualLength;
-    }
-
-    @Override
-    public int available() throws IOException {
-      if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (currentOffset == length) {
-          return 0;
-        }
-        readHeader();
-      }
-      return uncompressed.remaining();
-    }
-
-    @Override
-    public void close() {
-      uncompressed = null;
-      compressed = null;
-      currentRange = bytes.size();
-      currentOffset = length;
-      bytes.clear();
-    }
-
-    @Override
-    public void seek(PositionProvider index) throws IOException {
-      seek(index.getNext());
-      long uncompressedBytes = index.getNext();
-      if (uncompressedBytes != 0) {
-        readHeader();
-        uncompressed.position(uncompressed.position() +
-                              (int) uncompressedBytes);
-      } else if (uncompressed != null) {
-        // mark the uncompressed buffer as done
-        uncompressed.position(uncompressed.limit());
-      }
-    }
-
-    /* slices a read only contiguous buffer of chunkLength */
-    private ByteBuffer slice(int chunkLength) throws IOException {
-      int len = chunkLength;
-      final long oldOffset = currentOffset;
-      ByteBuffer slice;
-      if (compressed.remaining() >= len) {
-        slice = compressed.slice();
-        // simple case
-        slice.limit(len);
-        currentOffset += len;
-        compressed.position(compressed.position() + len);
-        return slice;
-      } else if (currentRange >= (bytes.size() - 1)) {
-        // nothing has been modified yet
-        throw new IOException("EOF in " + this + " while trying to read " +
-            chunkLength + " bytes");
-      }
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format(
-            "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)",
-            compressed.remaining(), len));
-      }
-
-      // we need to consolidate 2 or more buffers into 1
-      // first copy out compressed buffers
-      ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
-      currentOffset += compressed.remaining();
-      len -= compressed.remaining();
-      copy.put(compressed);
-      ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
-
-      while (len > 0 && iter.hasNext()) {
-        ++currentRange;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
-        }
-        DiskRange range = iter.next();
-        compressed = range.getData().duplicate();
-        if (compressed.remaining() >= len) {
-          slice = compressed.slice();
-          slice.limit(len);
-          copy.put(slice);
-          currentOffset += len;
-          compressed.position(compressed.position() + len);
-          return copy;
-        }
-        currentOffset += compressed.remaining();
-        len -= compressed.remaining();
-        copy.put(compressed);
-      }
-
-      // restore offsets for exception clarity
-      seek(oldOffset);
-      throw new IOException("EOF in " + this + " while trying to read " +
-          chunkLength + " bytes");
-    }
-
-    private void seek(long desired) throws IOException {
-      if (desired == 0 && bytes.isEmpty()) {
-        logEmptySeek(name);
-        return;
-      }
-      int i = 0;
-      for (DiskRange range : bytes) {
-        if (range.getOffset() <= desired && desired < range.getEnd()) {
-          currentRange = i;
-          compressed = range.getData().duplicate();
-          int pos = compressed.position();
-          pos += (int)(desired - range.getOffset());
-          compressed.position(pos);
-          currentOffset = desired;
-          return;
-        }
-        ++i;
-      }
-      // if they are seeking to the precise end, go ahead and let them go there
-      int segments = bytes.size();
-      if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
-        DiskRange range = bytes.get(segments - 1);
-        currentRange = segments - 1;
-        compressed = range.getData().duplicate();
-        compressed.position(compressed.limit());
-        currentOffset = desired;
-        return;
-      }
-      throw new IOException("Seek outside of data in " + this + " to " + desired);
-    }
-
-    private String rangeString() {
-      StringBuilder builder = new StringBuilder();
-      int i = 0;
-      for (DiskRange range : bytes) {
-        if (i != 0) {
-          builder.append("; ");
-        }
-        builder.append(" range " + i + " = " + range.getOffset()
-            + " to " + (range.getEnd() - range.getOffset()));
-        ++i;
-      }
-      return builder.toString();
-    }
-
-    @Override
-    public String toString() {
-      return "compressed stream " + name + " position: " + currentOffset +
-          " length: " + length + " range: " + currentRange +
-          " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
-          rangeString() +
-          (uncompressed == null ? "" :
-              " uncompressed: " + uncompressed.position() + " to " +
-                  uncompressed.limit());
-    }
-  }
-
-  public abstract void seek(PositionProvider index) throws IOException;
-
-  private static void logEmptySeek(String name) {
-    if (LOG.isWarnEnabled()) {
-      LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream.");
-    }
-  }
-
-  /**
-   * Create an input stream from a list of buffers.
-   * @param streamName the name of the stream
-   * @param buffers the list of ranges of bytes for the stream
-   * @param offsets a list of offsets (the same length as input) that must
-   *                contain the first offset of the each set of bytes in input
-   * @param length the length in bytes of the stream
-   * @param codec the compression codec
-   * @param bufferSize the compression buffer size
-   * @return an input stream
-   * @throws IOException
-   */
-  @VisibleForTesting
-  @Deprecated
-  public static InStream create(String streamName,
-                                ByteBuffer[] buffers,
-                                long[] offsets,
-                                long length,
-                                CompressionCodec codec,
-                                int bufferSize) throws IOException {
-    List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
-    for (int i = 0; i < buffers.length; ++i) {
-      input.add(new BufferChunk(buffers[i], offsets[i]));
-    }
-    return create(streamName, input, length, codec, bufferSize);
-  }
-
-  /**
-   * Create an input stream from a list of disk ranges with data.
-   * @param name the name of the stream
-   * @param input the list of ranges of bytes for the stream; from disk or cache
-   * @param length the length in bytes of the stream
-   * @param codec the compression codec
-   * @param bufferSize the compression buffer size
-   * @return an input stream
-   * @throws IOException
-   */
-  public static InStream create(String name,
-                                List<DiskRange> input,
-                                long length,
-                                CompressionCodec codec,
-                                int bufferSize) throws IOException {
-    if (codec == null) {
-      return new UncompressedStream(name, input, length);
-    } else {
-      return new CompressedStream(name, input, length, codec, bufferSize);
-    }
-  }
-
-  /**
-   * Creates coded input stream (used for protobuf message parsing) with higher message size limit.
-   *
-   * @param name       the name of the stream
-   * @param input      the list of ranges of bytes for the stream; from disk or cache
-   * @param length     the length in bytes of the stream
-   * @param codec      the compression codec
-   * @param bufferSize the compression buffer size
-   * @return coded input stream
-   * @throws IOException
-   */
-  public static CodedInputStream createCodedInputStream(
-      String name,
-      List<DiskRange> input,
-      long length,
-      CompressionCodec codec,
-      int bufferSize) throws IOException {
-    InStream inStream = create(name, input, length, codec, bufferSize);
-    CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream);
-    codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT);
-    return codedInputStream;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java
deleted file mode 100644
index 3e64d54..0000000
--- a/orc/src/java/org/apache/orc/impl/IntegerReader.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-
-/**
- * Interface for reading integers.
- */
-public interface IntegerReader {
-
-  /**
-   * Seek to the position provided by index.
-   * @param index
-   * @throws IOException
-   */
-  void seek(PositionProvider index) throws IOException;
-
-  /**
-   * Skip number of specified rows.
-   * @param numValues
-   * @throws IOException
-   */
-  void skip(long numValues) throws IOException;
-
-  /**
-   * Check if there are any more values left.
-   * @return
-   * @throws IOException
-   */
-  boolean hasNext() throws IOException;
-
-  /**
-   * Return the next available value.
-   * @return
-   * @throws IOException
-   */
-  long next() throws IOException;
-
-  /**
-   * Return the next available vector for values.
-   * @param column the column being read
-   * @param data the vector to read into
-   * @param length the number of numbers to read
-   * @throws IOException
-   */
-   void nextVector(ColumnVector column,
-                   long[] data,
-                   int length
-                   ) throws IOException;
-
-  /**
-   * Return the next available vector for values. Does not change the
-   * value of column.isRepeating.
-   * @param column the column being read
-   * @param data the vector to read into
-   * @param length the number of numbers to read
-   * @throws IOException
-   */
-  void nextVector(ColumnVector column,
-                  int[] data,
-                  int length
-                  ) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerWriter.java b/orc/src/java/org/apache/orc/impl/IntegerWriter.java
deleted file mode 100644
index 419054f..0000000
--- a/orc/src/java/org/apache/orc/impl/IntegerWriter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import java.io.IOException;
-
-/**
- * Interface for writing integers.
- */
-public interface IntegerWriter {
-
-  /**
-   * Get position from the stream.
-   * @param recorder
-   * @throws IOException
-   */
-  void getPosition(PositionRecorder recorder) throws IOException;
-
-  /**
-   * Write the integer value
-   * @param value
-   * @throws IOException
-   */
-  void write(long value) throws IOException;
-
-  /**
-   * Flush the buffer
-   * @throws IOException
-   */
-  void flush() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/MemoryManager.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MemoryManager.java b/orc/src/java/org/apache/orc/impl/MemoryManager.java
deleted file mode 100644
index 757c0b4..0000000
--- a/orc/src/java/org/apache/orc/impl/MemoryManager.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.orc.OrcConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Implements a memory manager that keeps a global context of how many ORC
- * writers there are and manages the memory between them. For use cases with
- * dynamic partitions, it is easy to end up with many writers in the same task.
- * By managing the size of each allocation, we try to cut down the size of each
- * allocation and keep the task from running out of memory.
- * 
- * This class is not thread safe, but is re-entrant - ensure creation and all
- * invocations are triggered from the same thread.
- */
-public class MemoryManager {
-
-  private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class);
-
-  /**
-   * How often should we check the memory sizes? Measured in rows added
-   * to all of the writers.
-   */
-  private static final int ROWS_BETWEEN_CHECKS = 5000;
-  private final long totalMemoryPool;
-  private final Map<Path, WriterInfo> writerList =
-      new HashMap<Path, WriterInfo>();
-  private long totalAllocation = 0;
-  private double currentScale = 1;
-  private int rowsAddedSinceCheck = 0;
-  private final OwnedLock ownerLock = new OwnedLock();
-
-  @SuppressWarnings("serial")
-  private static class OwnedLock extends ReentrantLock {
-    public Thread getOwner() {
-      return super.getOwner();
-    }
-  }
-
-  private static class WriterInfo {
-    long allocation;
-    Callback callback;
-    WriterInfo(long allocation, Callback callback) {
-      this.allocation = allocation;
-      this.callback = callback;
-    }
-  }
-
-  public interface Callback {
-    /**
-     * The writer needs to check its memory usage
-     * @param newScale the current scale factor for memory allocations
-     * @return true if the writer was over the limit
-     * @throws IOException
-     */
-    boolean checkMemory(double newScale) throws IOException;
-  }
-
-  /**
-   * Create the memory manager.
-   * @param conf use the configuration to find the maximum size of the memory
-   *             pool.
-   */
-  public MemoryManager(Configuration conf) {
-    double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
-    totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
-        getHeapMemoryUsage().getMax() * maxLoad);
-    ownerLock.lock();
-  }
-
-  /**
-   * Light weight thread-safety check for multi-threaded access patterns
-   */
-  private void checkOwner() {
-    if (!ownerLock.isHeldByCurrentThread()) {
-      LOG.warn("Owner thread expected {}, got {}",
-          ownerLock.getOwner(), Thread.currentThread());
-    }
-  }
-
-  /**
-   * Add a new writer's memory allocation to the pool. We use the path
-   * as a unique key to ensure that we don't get duplicates.
-   * @param path the file that is being written
-   * @param requestedAllocation the requested buffer size
-   */
-  public void addWriter(Path path, long requestedAllocation,
-                              Callback callback) throws IOException {
-    checkOwner();
-    WriterInfo oldVal = writerList.get(path);
-    // this should always be null, but we handle the case where the memory
-    // manager wasn't told that a writer wasn't still in use and the task
-    // starts writing to the same path.
-    if (oldVal == null) {
-      oldVal = new WriterInfo(requestedAllocation, callback);
-      writerList.put(path, oldVal);
-      totalAllocation += requestedAllocation;
-    } else {
-      // handle a new writer that is writing to the same path
-      totalAllocation += requestedAllocation - oldVal.allocation;
-      oldVal.allocation = requestedAllocation;
-      oldVal.callback = callback;
-    }
-    updateScale(true);
-  }
-
-  /**
-   * Remove the given writer from the pool.
-   * @param path the file that has been closed
-   */
-  public void removeWriter(Path path) throws IOException {
-    checkOwner();
-    WriterInfo val = writerList.get(path);
-    if (val != null) {
-      writerList.remove(path);
-      totalAllocation -= val.allocation;
-      if (writerList.isEmpty()) {
-        rowsAddedSinceCheck = 0;
-      }
-      updateScale(false);
-    }
-    if(writerList.isEmpty()) {
-      rowsAddedSinceCheck = 0;
-    }
-  }
-
-  /**
-   * Get the total pool size that is available for ORC writers.
-   * @return the number of bytes in the pool
-   */
-  public long getTotalMemoryPool() {
-    return totalMemoryPool;
-  }
-
-  /**
-   * The scaling factor for each allocation to ensure that the pool isn't
-   * oversubscribed.
-   * @return a fraction between 0.0 and 1.0 of the requested size that is
-   * available for each writer.
-   */
-  public double getAllocationScale() {
-    return currentScale;
-  }
-
-  /**
-   * Give the memory manager an opportunity for doing a memory check.
-   * @param rows number of rows added
-   * @throws IOException
-   */
-  public void addedRow(int rows) throws IOException {
-    rowsAddedSinceCheck += rows;
-    if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
-      notifyWriters();
-    }
-  }
-
-  /**
-   * Notify all of the writers that they should check their memory usage.
-   * @throws IOException
-   */
-  public void notifyWriters() throws IOException {
-    checkOwner();
-    LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
-    for(WriterInfo writer: writerList.values()) {
-      boolean flushed = writer.callback.checkMemory(currentScale);
-      if (LOG.isDebugEnabled() && flushed) {
-        LOG.debug("flushed " + writer.toString());
-      }
-    }
-    rowsAddedSinceCheck = 0;
-  }
-
-  /**
-   * Update the currentScale based on the current allocation and pool size.
-   * This also updates the notificationTrigger.
-   * @param isAllocate is this an allocation?
-   */
-  private void updateScale(boolean isAllocate) throws IOException {
-    if (totalAllocation <= totalMemoryPool) {
-      currentScale = 1;
-    } else {
-      currentScale = (double) totalMemoryPool / totalAllocation;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java b/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
deleted file mode 100644
index 7ca9e1d..0000000
--- a/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.orc.Reader;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-
-public class OrcAcidUtils {
-  public static final String ACID_STATS = "hive.acid.stats";
-  public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
-
-  /**
-   * Get the filename of the ORC ACID side file that contains the lengths
-   * of the intermediate footers.
-   * @param main the main ORC filename
-   * @return the name of the side file
-   */
-  public static Path getSideFile(Path main) {
-    return new Path(main + DELTA_SIDE_FILE_SUFFIX);
-  }
-
-  /**
-   * Read the side file to get the last flush length.
-   * @param fs the file system to use
-   * @param deltaFile the path of the delta file
-   * @return the maximum size of the file to use
-   * @throws IOException
-   */
-  public static long getLastFlushLength(FileSystem fs,
-                                        Path deltaFile) throws IOException {
-    Path lengths = getSideFile(deltaFile);
-    long result = Long.MAX_VALUE;
-    if(!fs.exists(lengths)) {
-      return result;
-    }
-    try (FSDataInputStream stream = fs.open(lengths)) {
-      result = -1;
-      while (stream.available() > 0) {
-        result = stream.readLong();
-      }
-      return result;
-    } catch (IOException ioe) {
-      return result;
-    }
-  }
-
-  private static final Charset utf8 = Charset.forName("UTF-8");
-  private static final CharsetDecoder utf8Decoder = utf8.newDecoder();
-
-  public static AcidStats parseAcidStats(Reader reader) {
-    if (reader.hasMetadataValue(ACID_STATS)) {
-      try {
-        ByteBuffer val = reader.getMetadataValue(ACID_STATS).duplicate();
-        return new AcidStats(utf8Decoder.decode(val).toString());
-      } catch (CharacterCodingException e) {
-        throw new IllegalArgumentException("Bad string encoding for " +
-            ACID_STATS, e);
-      }
-    } else {
-      return null;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OrcIndex.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcIndex.java b/orc/src/java/org/apache/orc/impl/OrcIndex.java
deleted file mode 100644
index 50a15f2..0000000
--- a/orc/src/java/org/apache/orc/impl/OrcIndex.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.orc.OrcProto;
-
-public final class OrcIndex {
-  OrcProto.RowIndex[] rowGroupIndex;
-  OrcProto.BloomFilterIndex[] bloomFilterIndex;
-
-  public OrcIndex(OrcProto.RowIndex[] rgIndex, OrcProto.BloomFilterIndex[] bfIndex) {
-    this.rowGroupIndex = rgIndex;
-    this.bloomFilterIndex = bfIndex;
-  }
-
-  public OrcProto.RowIndex[] getRowGroupIndex() {
-    return rowGroupIndex;
-  }
-
-  public OrcProto.BloomFilterIndex[] getBloomFilterIndex() {
-    return bloomFilterIndex;
-  }
-
-  public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) {
-    this.rowGroupIndex = rowGroupIndex;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OrcTail.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcTail.java b/orc/src/java/org/apache/orc/impl/OrcTail.java
deleted file mode 100644
index f095603..0000000
--- a/orc/src/java/org/apache/orc/impl/OrcTail.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import static org.apache.orc.impl.ReaderImpl.extractMetadata;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcFile;
-import org.apache.orc.OrcProto;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.StripeStatistics;
-
-// TODO: Make OrcTail implement FileMetadata or Reader interface
-public final class OrcTail {
-  // postscript + footer - Serialized in OrcSplit
-  private final OrcProto.FileTail fileTail;
-  // serialized representation of metadata, footer and postscript
-  private final ByteBuffer serializedTail;
-  // used to invalidate cache entries
-  private final long fileModificationTime;
-  // lazily deserialized
-  private OrcProto.Metadata metadata;
-
-  public OrcTail(OrcProto.FileTail fileTail, ByteBuffer serializedTail) {
-    this(fileTail, serializedTail, -1);
-  }
-
-  public OrcTail(OrcProto.FileTail fileTail, ByteBuffer serializedTail, long fileModificationTime) {
-    this.fileTail = fileTail;
-    this.serializedTail = serializedTail;
-    this.fileModificationTime = fileModificationTime;
-    this.metadata = null;
-  }
-
-  public ByteBuffer getSerializedTail() {
-    return serializedTail;
-  }
-
-  public long getFileModificationTime() {
-    return fileModificationTime;
-  }
-
-  public OrcProto.Footer getFooter() {
-    return fileTail.getFooter();
-  }
-
-  public OrcProto.PostScript getPostScript() {
-    return fileTail.getPostscript();
-  }
-
-  public OrcFile.WriterVersion getWriterVersion() {
-    OrcProto.PostScript ps = fileTail.getPostscript();
-    return (ps.hasWriterVersion()
-        ? OrcFile.WriterVersion.from(ps.getWriterVersion()) : OrcFile.WriterVersion.ORIGINAL);
-  }
-
-  public List<StripeInformation> getStripes() {
-    List<StripeInformation> result = new ArrayList<>(fileTail.getFooter().getStripesCount());
-    for (OrcProto.StripeInformation stripeProto : fileTail.getFooter().getStripesList()) {
-      result.add(new ReaderImpl.StripeInformationImpl(stripeProto));
-    }
-    return result;
-  }
-
-  public CompressionKind getCompressionKind() {
-    return CompressionKind.valueOf(fileTail.getPostscript().getCompression().name());
-  }
-
-  public CompressionCodec getCompressionCodec() {
-    return PhysicalFsWriter.createCodec(getCompressionKind());
-  }
-
-  public int getCompressionBufferSize() {
-    return (int) fileTail.getPostscript().getCompressionBlockSize();
-  }
-
-  public List<StripeStatistics> getStripeStatistics() throws IOException {
-    List<StripeStatistics> result = new ArrayList<>();
-    List<OrcProto.StripeStatistics> ssProto = getStripeStatisticsProto();
-    if (ssProto != null) {
-      for (OrcProto.StripeStatistics ss : ssProto) {
-        result.add(new StripeStatistics(ss.getColStatsList()));
-      }
-    }
-    return result;
-  }
-
-  public List<OrcProto.StripeStatistics> getStripeStatisticsProto() throws IOException {
-    if (serializedTail == null) return null;
-    if (metadata == null) {
-      metadata = extractMetadata(serializedTail, 0,
-          (int) fileTail.getPostscript().getMetadataLength(),
-          getCompressionCodec(), getCompressionBufferSize());
-      // clear does not clear the contents but sets position to 0 and limit = capacity
-      serializedTail.clear();
-    }
-    return metadata.getStripeStatsList();
-  }
-
-  public int getMetadataSize() {
-    return (int) getPostScript().getMetadataLength();
-  }
-
-  public List<OrcProto.Type> getTypes() {
-    return getFooter().getTypesList();
-  }
-
-  public OrcProto.FileTail getFileTail() {
-    return fileTail;
-  }
-
-  public OrcProto.FileTail getMinimalFileTail() {
-    OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder(fileTail);
-    OrcProto.Footer.Builder footerBuilder = OrcProto.Footer.newBuilder(fileTail.getFooter());
-    footerBuilder.clearStatistics();
-    fileTailBuilder.setFooter(footerBuilder.build());
-    OrcProto.FileTail result = fileTailBuilder.build();
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OutStream.java b/orc/src/java/org/apache/orc/impl/OutStream.java
deleted file mode 100644
index 81662cc..0000000
--- a/orc/src/java/org/apache/orc/impl/OutStream.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import org.apache.orc.CompressionCodec;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class OutStream extends PositionedOutputStream {
-
-  public interface OutputReceiver {
-    /**
-     * Output the given buffer to the final destination
-     * @param buffer the buffer to output
-     * @throws IOException
-     */
-    void output(ByteBuffer buffer) throws IOException;
-  }
-
-  public static final int HEADER_SIZE = 3;
-  private final String name;
-  private final OutputReceiver receiver;
-  // if enabled the stream will be suppressed when writing stripe
-  private boolean suppress;
-
-  /**
-   * Stores the uncompressed bytes that have been serialized, but not
-   * compressed yet. When this fills, we compress the entire buffer.
-   */
-  private ByteBuffer current = null;
-
-  /**
-   * Stores the compressed bytes until we have a full buffer and then outputs
-   * them to the receiver. If no compression is being done, this (and overflow)
-   * will always be null and the current buffer will be sent directly to the
-   * receiver.
-   */
-  private ByteBuffer compressed = null;
-
-  /**
-   * Since the compressed buffer may start with contents from previous
-   * compression blocks, we allocate an overflow buffer so that the
-   * output of the codec can be split between the two buffers. After the
-   * compressed buffer is sent to the receiver, the overflow buffer becomes
-   * the new compressed buffer.
-   */
-  private ByteBuffer overflow = null;
-  private final int bufferSize;
-  private final CompressionCodec codec;
-  private long compressedBytes = 0;
-  private long uncompressedBytes = 0;
-
-  public OutStream(String name,
-                   int bufferSize,
-                   CompressionCodec codec,
-                   OutputReceiver receiver) throws IOException {
-    this.name = name;
-    this.bufferSize = bufferSize;
-    this.codec = codec;
-    this.receiver = receiver;
-    this.suppress = false;
-  }
-
-  public void clear() throws IOException {
-    flush();
-    suppress = false;
-  }
-
-  /**
-   * Write the length of the compressed bytes. Life is much easier if the
-   * header is constant length, so just use 3 bytes. Considering most of the
-   * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should
-   * be plenty. We also use the low bit for whether it is the original or
-   * compressed bytes.
-   * @param buffer the buffer to write the header to
-   * @param position the position in the buffer to write at
-   * @param val the size in the file
-   * @param original is it uncompressed
-   */
-  private static void writeHeader(ByteBuffer buffer,
-                                  int position,
-                                  int val,
-                                  boolean original) {
-    buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
-    buffer.put(position + 1, (byte) (val >> 7));
-    buffer.put(position + 2, (byte) (val >> 15));
-  }
-
-  private void getNewInputBuffer() throws IOException {
-    if (codec == null) {
-      current = ByteBuffer.allocate(bufferSize);
-    } else {
-      current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
-      writeHeader(current, 0, bufferSize, true);
-      current.position(HEADER_SIZE);
-    }
-  }
-
-  /**
-   * Allocate a new output buffer if we are compressing.
-   */
-  private ByteBuffer getNewOutputBuffer() throws IOException {
-    return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
-  }
-
-  private void flip() throws IOException {
-    current.limit(current.position());
-    current.position(codec == null ? 0 : HEADER_SIZE);
-  }
-
-  @Override
-  public void write(int i) throws IOException {
-    if (current == null) {
-      getNewInputBuffer();
-    }
-    if (current.remaining() < 1) {
-      spill();
-    }
-    uncompressedBytes += 1;
-    current.put((byte) i);
-  }
-
-  @Override
-  public void write(byte[] bytes, int offset, int length) throws IOException {
-    if (current == null) {
-      getNewInputBuffer();
-    }
-    int remaining = Math.min(current.remaining(), length);
-    current.put(bytes, offset, remaining);
-    uncompressedBytes += remaining;
-    length -= remaining;
-    while (length != 0) {
-      spill();
-      offset += remaining;
-      remaining = Math.min(current.remaining(), length);
-      current.put(bytes, offset, remaining);
-      uncompressedBytes += remaining;
-      length -= remaining;
-    }
-  }
-
-  private void spill() throws java.io.IOException {
-    // if there isn't anything in the current buffer, don't spill
-    if (current == null ||
-        current.position() == (codec == null ? 0 : HEADER_SIZE)) {
-      return;
-    }
-    flip();
-    if (codec == null) {
-      receiver.output(current);
-      getNewInputBuffer();
-    } else {
-      if (compressed == null) {
-        compressed = getNewOutputBuffer();
-      } else if (overflow == null) {
-        overflow = getNewOutputBuffer();
-      }
-      int sizePosn = compressed.position();
-      compressed.position(compressed.position() + HEADER_SIZE);
-      if (codec.compress(current, compressed, overflow)) {
-        uncompressedBytes = 0;
-        // move position back to after the header
-        current.position(HEADER_SIZE);
-        current.limit(current.capacity());
-        // find the total bytes in the chunk
-        int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
-        if (overflow != null) {
-          totalBytes += overflow.position();
-        }
-        compressedBytes += totalBytes + HEADER_SIZE;
-        writeHeader(compressed, sizePosn, totalBytes, false);
-        // if we have less than the next header left, spill it.
-        if (compressed.remaining() < HEADER_SIZE) {
-          compressed.flip();
-          receiver.output(compressed);
-          compressed = overflow;
-          overflow = null;
-        }
-      } else {
-        compressedBytes += uncompressedBytes + HEADER_SIZE;
-        uncompressedBytes = 0;
-        // we are using the original, but need to spill the current
-        // compressed buffer first. So back up to where we started,
-        // flip it and add it to done.
-        if (sizePosn != 0) {
-          compressed.position(sizePosn);
-          compressed.flip();
-          receiver.output(compressed);
-          compressed = null;
-          // if we have an overflow, clear it and make it the new compress
-          // buffer
-          if (overflow != null) {
-            overflow.clear();
-            compressed = overflow;
-            overflow = null;
-          }
-        } else {
-          compressed.clear();
-          if (overflow != null) {
-            overflow.clear();
-          }
-        }
-
-        // now add the current buffer into the done list and get a new one.
-        current.position(0);
-        // update the header with the current length
-        writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
-        receiver.output(current);
-        getNewInputBuffer();
-      }
-    }
-  }
-
-  @Override
-  public void getPosition(PositionRecorder recorder) throws IOException {
-    if (codec == null) {
-      recorder.addPosition(uncompressedBytes);
-    } else {
-      recorder.addPosition(compressedBytes);
-      recorder.addPosition(uncompressedBytes);
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    spill();
-    if (compressed != null && compressed.position() != 0) {
-      compressed.flip();
-      receiver.output(compressed);
-    }
-    compressed = null;
-    uncompressedBytes = 0;
-    compressedBytes = 0;
-    overflow = null;
-    current = null;
-  }
-
-  @Override
-  public String toString() {
-    return name;
-  }
-
-  @Override
-  public long getBufferSize() {
-    long result = 0;
-    if (current != null) {
-      result += current.capacity();
-    }
-    if (compressed != null) {
-      result += compressed.capacity();
-    }
-    if (overflow != null) {
-      result += overflow.capacity();
-    }
-    return result;
-  }
-
-  /**
-   * Set suppress flag
-   */
-  public void suppress() {
-    suppress = true;
-  }
-
-  /**
-   * Returns the state of suppress flag
-   * @return value of suppress flag
-   */
-  public boolean isSuppressed() {
-    return suppress;
-  }
-}
-