You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/05/13 19:50:48 UTC
[19/23] orc git commit: ORC-1 Import of ORC code from Hive. (omalley
reviewed by prasanthj)
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
new file mode 100644
index 0000000..bb73d53
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -0,0 +1,107 @@
+package org.apache.orc.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionKind;
+
+import javax.annotation.Nullable;
+
+public final class DataReaderProperties {
+
+ private final FileSystem fileSystem;
+ private final Path path;
+ private final CompressionKind compression;
+ private final boolean zeroCopy;
+ private final int typeCount;
+ private final int bufferSize;
+
+ private DataReaderProperties(Builder builder) {
+ this.fileSystem = builder.fileSystem;
+ this.path = builder.path;
+ this.compression = builder.compression;
+ this.zeroCopy = builder.zeroCopy;
+ this.typeCount = builder.typeCount;
+ this.bufferSize = builder.bufferSize;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public CompressionKind getCompression() {
+ return compression;
+ }
+
+ public boolean getZeroCopy() {
+ return zeroCopy;
+ }
+
+ public int getTypeCount() {
+ return typeCount;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private FileSystem fileSystem;
+ private Path path;
+ private CompressionKind compression;
+ private boolean zeroCopy;
+ private int typeCount;
+ private int bufferSize;
+
+ private Builder() {
+
+ }
+
+ public Builder withFileSystem(FileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ return this;
+ }
+
+ public Builder withPath(Path path) {
+ this.path = path;
+ return this;
+ }
+
+ public Builder withCompression(CompressionKind value) {
+ this.compression = value;
+ return this;
+ }
+
+ public Builder withZeroCopy(boolean zeroCopy) {
+ this.zeroCopy = zeroCopy;
+ return this;
+ }
+
+ public Builder withTypeCount(int value) {
+ this.typeCount = value;
+ return this;
+ }
+
+ public Builder withBufferSize(int value) {
+ this.bufferSize = value;
+ return this;
+ }
+
+ public DataReaderProperties build() {
+ Preconditions.checkNotNull(fileSystem);
+ Preconditions.checkNotNull(path);
+
+ return new DataReaderProperties(this);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java b/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
new file mode 100644
index 0000000..7e0110d
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import org.apache.orc.CompressionCodec;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface DirectDecompressionCodec extends CompressionCodec {
+ public boolean isAvailable();
+ public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java b/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java
new file mode 100644
index 0000000..986c2ac
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * A class that is a growable array of bytes. Growth is managed in terms of
+ * chunks that are allocated when needed.
+ */
+public final class DynamicByteArray {
+ static final int DEFAULT_CHUNKSIZE = 32 * 1024;
+ static final int DEFAULT_NUM_CHUNKS = 128;
+
+ private final int chunkSize; // our allocation sizes
+ private byte[][] data; // the real data
+ private int length; // max set element index +1
+ private int initializedChunks = 0; // the number of chunks created
+
+ public DynamicByteArray() {
+ this(DEFAULT_NUM_CHUNKS, DEFAULT_CHUNKSIZE);
+ }
+
+ public DynamicByteArray(int numChunks, int chunkSize) {
+ if (chunkSize == 0) {
+ throw new IllegalArgumentException("bad chunksize");
+ }
+ this.chunkSize = chunkSize;
+ data = new byte[numChunks][];
+ }
+
+ /**
+ * Ensure that the given index is valid.
+ */
+ private void grow(int chunkIndex) {
+ if (chunkIndex >= initializedChunks) {
+ if (chunkIndex >= data.length) {
+ int newSize = Math.max(chunkIndex + 1, 2 * data.length);
+ byte[][] newChunk = new byte[newSize][];
+ System.arraycopy(data, 0, newChunk, 0, data.length);
+ data = newChunk;
+ }
+ for(int i=initializedChunks; i <= chunkIndex; ++i) {
+ data[i] = new byte[chunkSize];
+ }
+ initializedChunks = chunkIndex + 1;
+ }
+ }
+
+ public byte get(int index) {
+ if (index >= length) {
+ throw new IndexOutOfBoundsException("Index " + index +
+ " is outside of 0.." +
+ (length - 1));
+ }
+ int i = index / chunkSize;
+ int j = index % chunkSize;
+ return data[i][j];
+ }
+
+ public void set(int index, byte value) {
+ int i = index / chunkSize;
+ int j = index % chunkSize;
+ grow(i);
+ if (index >= length) {
+ length = index + 1;
+ }
+ data[i][j] = value;
+ }
+
+ public int add(byte value) {
+ int i = length / chunkSize;
+ int j = length % chunkSize;
+ grow(i);
+ data[i][j] = value;
+ int result = length;
+ length += 1;
+ return result;
+ }
+
+ /**
+ * Copy a slice of a byte array into our buffer.
+ * @param value the array to copy from
+ * @param valueOffset the first location to copy from value
+ * @param valueLength the number of bytes to copy from value
+ * @return the offset of the start of the value
+ */
+ public int add(byte[] value, int valueOffset, int valueLength) {
+ int i = length / chunkSize;
+ int j = length % chunkSize;
+ grow((length + valueLength) / chunkSize);
+ int remaining = valueLength;
+ while (remaining > 0) {
+ int size = Math.min(remaining, chunkSize - j);
+ System.arraycopy(value, valueOffset, data[i], j, size);
+ remaining -= size;
+ valueOffset += size;
+ i += 1;
+ j = 0;
+ }
+ int result = length;
+ length += valueLength;
+ return result;
+ }
+
+ /**
+ * Read the entire stream into this array.
+ * @param in the stream to read from
+ * @throws IOException
+ */
+ public void readAll(InputStream in) throws IOException {
+ int currentChunk = length / chunkSize;
+ int currentOffset = length % chunkSize;
+ grow(currentChunk);
+ int currentLength = in.read(data[currentChunk], currentOffset,
+ chunkSize - currentOffset);
+ while (currentLength > 0) {
+ length += currentLength;
+ currentOffset = length % chunkSize;
+ if (currentOffset == 0) {
+ currentChunk = length / chunkSize;
+ grow(currentChunk);
+ }
+ currentLength = in.read(data[currentChunk], currentOffset,
+ chunkSize - currentOffset);
+ }
+ }
+
+ /**
+ * Byte compare a set of bytes against the bytes in this dynamic array.
+ * @param other source of the other bytes
+ * @param otherOffset start offset in the other array
+ * @param otherLength number of bytes in the other array
+ * @param ourOffset the offset in our array
+ * @param ourLength the number of bytes in our array
+ * @return negative for less, 0 for equal, positive for greater
+ */
+ public int compare(byte[] other, int otherOffset, int otherLength,
+ int ourOffset, int ourLength) {
+ int currentChunk = ourOffset / chunkSize;
+ int currentOffset = ourOffset % chunkSize;
+ int maxLength = Math.min(otherLength, ourLength);
+ while (maxLength > 0 &&
+ other[otherOffset] == data[currentChunk][currentOffset]) {
+ otherOffset += 1;
+ currentOffset += 1;
+ if (currentOffset == chunkSize) {
+ currentChunk += 1;
+ currentOffset = 0;
+ }
+ maxLength -= 1;
+ }
+ if (maxLength == 0) {
+ return otherLength - ourLength;
+ }
+ int otherByte = 0xff & other[otherOffset];
+ int ourByte = 0xff & data[currentChunk][currentOffset];
+ return otherByte > ourByte ? 1 : -1;
+ }
+
+ /**
+ * Get the size of the array.
+ * @return the number of bytes in the array
+ */
+ public int size() {
+ return length;
+ }
+
+ /**
+ * Clear the array to its original pristine state.
+ */
+ public void clear() {
+ length = 0;
+ for(int i=0; i < data.length; ++i) {
+ data[i] = null;
+ }
+ initializedChunks = 0;
+ }
+
+ /**
+ * Set a text value from the bytes in this dynamic array.
+ * @param result the value to set
+ * @param offset the start of the bytes to copy
+ * @param length the number of bytes to copy
+ */
+ public void setText(Text result, int offset, int length) {
+ result.clear();
+ int currentChunk = offset / chunkSize;
+ int currentOffset = offset % chunkSize;
+ int currentLength = Math.min(length, chunkSize - currentOffset);
+ while (length > 0) {
+ result.append(data[currentChunk], currentOffset, currentLength);
+ length -= currentLength;
+ currentChunk += 1;
+ currentOffset = 0;
+ currentLength = Math.min(length, chunkSize - currentOffset);
+ }
+ }
+
+ /**
+ * Write out a range of this dynamic array to an output stream.
+ * @param out the stream to write to
+ * @param offset the first offset to write
+ * @param length the number of bytes to write
+ * @throws IOException
+ */
+ public void write(OutputStream out, int offset,
+ int length) throws IOException {
+ int currentChunk = offset / chunkSize;
+ int currentOffset = offset % chunkSize;
+ while (length > 0) {
+ int currentLength = Math.min(length, chunkSize - currentOffset);
+ out.write(data[currentChunk], currentOffset, currentLength);
+ length -= currentLength;
+ currentChunk += 1;
+ currentOffset = 0;
+ }
+ }
+
+ @Override
+ public String toString() {
+ int i;
+ StringBuilder sb = new StringBuilder(length * 3);
+
+ sb.append('{');
+ int l = length - 1;
+ for (i=0; i<l; i++) {
+ sb.append(Integer.toHexString(get(i)));
+ sb.append(',');
+ }
+ sb.append(get(i));
+ sb.append('}');
+
+ return sb.toString();
+ }
+
+ public void setByteBuffer(ByteBuffer result, int offset, int length) {
+ result.clear();
+ int currentChunk = offset / chunkSize;
+ int currentOffset = offset % chunkSize;
+ int currentLength = Math.min(length, chunkSize - currentOffset);
+ while (length > 0) {
+ result.put(data[currentChunk], currentOffset, currentLength);
+ length -= currentLength;
+ currentChunk += 1;
+ currentOffset = 0;
+ currentLength = Math.min(length, chunkSize - currentOffset);
+ }
+ }
+
+ /**
+ * Gets all the bytes of the array.
+ *
+ * @return Bytes of the array
+ */
+ public byte[] get() {
+ byte[] result = null;
+ if (length > 0) {
+ int currentChunk = 0;
+ int currentOffset = 0;
+ int currentLength = Math.min(length, chunkSize);
+ int destOffset = 0;
+ result = new byte[length];
+ int totalLength = length;
+ while (totalLength > 0) {
+ System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
+ destOffset += currentLength;
+ totalLength -= currentLength;
+ currentChunk += 1;
+ currentOffset = 0;
+ currentLength = Math.min(totalLength, chunkSize - currentOffset);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Get the size of the buffers.
+ */
+ public long getSizeInBytes() {
+ return initializedChunks * chunkSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java b/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java
new file mode 100644
index 0000000..3b2884b
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+/**
+ * Dynamic int array that uses primitive types and chunks to avoid copying
+ * large number of integers when it resizes.
+ *
+ * The motivation for this class is memory optimization, i.e. space efficient
+ * storage of potentially huge arrays without good a-priori size guesses.
+ *
+ * The API of this class is between a primitive array and a AbstractList. It's
+ * not a Collection implementation because it handles primitive types, but the
+ * API could be extended to support iterators and the like.
+ *
+ * NOTE: Like standard Collection implementations/arrays, this class is not
+ * synchronized.
+ */
+public final class DynamicIntArray {
+ static final int DEFAULT_CHUNKSIZE = 8 * 1024;
+ static final int INIT_CHUNKS = 128;
+
+ private final int chunkSize; // our allocation size
+ private int[][] data; // the real data
+ private int length; // max set element index +1
+ private int initializedChunks = 0; // the number of created chunks
+
+ public DynamicIntArray() {
+ this(DEFAULT_CHUNKSIZE);
+ }
+
+ public DynamicIntArray(int chunkSize) {
+ this.chunkSize = chunkSize;
+
+ data = new int[INIT_CHUNKS][];
+ }
+
+ /**
+ * Ensure that the given index is valid.
+ */
+ private void grow(int chunkIndex) {
+ if (chunkIndex >= initializedChunks) {
+ if (chunkIndex >= data.length) {
+ int newSize = Math.max(chunkIndex + 1, 2 * data.length);
+ int[][] newChunk = new int[newSize][];
+ System.arraycopy(data, 0, newChunk, 0, data.length);
+ data = newChunk;
+ }
+ for (int i=initializedChunks; i <= chunkIndex; ++i) {
+ data[i] = new int[chunkSize];
+ }
+ initializedChunks = chunkIndex + 1;
+ }
+ }
+
+ public int get(int index) {
+ if (index >= length) {
+ throw new IndexOutOfBoundsException("Index " + index +
+ " is outside of 0.." +
+ (length - 1));
+ }
+ int i = index / chunkSize;
+ int j = index % chunkSize;
+ return data[i][j];
+ }
+
+ public void set(int index, int value) {
+ int i = index / chunkSize;
+ int j = index % chunkSize;
+ grow(i);
+ if (index >= length) {
+ length = index + 1;
+ }
+ data[i][j] = value;
+ }
+
+ public void increment(int index, int value) {
+ int i = index / chunkSize;
+ int j = index % chunkSize;
+ grow(i);
+ if (index >= length) {
+ length = index + 1;
+ }
+ data[i][j] += value;
+ }
+
+ public void add(int value) {
+ int i = length / chunkSize;
+ int j = length % chunkSize;
+ grow(i);
+ data[i][j] = value;
+ length += 1;
+ }
+
+ public int size() {
+ return length;
+ }
+
+ public void clear() {
+ length = 0;
+ for(int i=0; i < data.length; ++i) {
+ data[i] = null;
+ }
+ initializedChunks = 0;
+ }
+
+ public String toString() {
+ int i;
+ StringBuilder sb = new StringBuilder(length * 4);
+
+ sb.append('{');
+ int l = length - 1;
+ for (i=0; i<l; i++) {
+ sb.append(get(i));
+ sb.append(',');
+ }
+ sb.append(get(i));
+ sb.append('}');
+
+ return sb.toString();
+ }
+
+ public int getSizeInBytes() {
+ return 4 * initializedChunks * chunkSize;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/HadoopShims.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShims.java b/java/core/src/java/org/apache/orc/impl/HadoopShims.java
new file mode 100644
index 0000000..ef7d70f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/HadoopShims.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.VersionInfo;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public interface HadoopShims {
+
+ enum DirectCompressionType {
+ NONE,
+ ZLIB_NOHEADER,
+ ZLIB,
+ SNAPPY,
+ }
+
+ interface DirectDecompressor {
+ void decompress(ByteBuffer var1, ByteBuffer var2) throws IOException;
+ }
+
+ /**
+ * Get a direct decompressor codec, if it is available
+ * @param codec
+ * @return
+ */
+ DirectDecompressor getDirectDecompressor(DirectCompressionType codec);
+
+ /**
+ * a hadoop.io ByteBufferPool shim.
+ */
+ public interface ByteBufferPoolShim {
+ /**
+ * Get a new ByteBuffer from the pool. The pool can provide this from
+ * removing a buffer from its internal cache, or by allocating a
+ * new buffer.
+ *
+ * @param direct Whether the buffer should be direct.
+ * @param length The minimum length the buffer will have.
+ * @return A new ByteBuffer. Its capacity can be less
+ * than what was requested, but must be at
+ * least 1 byte.
+ */
+ ByteBuffer getBuffer(boolean direct, int length);
+
+ /**
+ * Release a buffer back to the pool.
+ * The pool may choose to put this buffer into its cache/free it.
+ *
+ * @param buffer a direct bytebuffer
+ */
+ void putBuffer(ByteBuffer buffer);
+ }
+
+ /**
+ * Provides an HDFS ZeroCopyReader shim.
+ * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to)
+ * @param in ByteBufferPoolShim to allocate fallback buffers with
+ *
+ * @return returns null if not supported
+ */
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException;
+
+ public interface ZeroCopyReaderShim extends Closeable {
+ /**
+ * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer.
+ * Also move the in stream by that amount. The data read can be small than maxLength.
+ *
+ * @return ByteBuffer read from the stream,
+ */
+ public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException;
+ /**
+ * Release a ByteBuffer obtained from a read on the
+ * Also move the in stream by that amount. The data read can be small than maxLength.
+ *
+ */
+ public void releaseBuffer(ByteBuffer buffer);
+
+ /**
+ * Close the underlying stream.
+ * @throws IOException
+ */
+ public void close() throws IOException;
+ }
+ /**
+ * Read data into a Text object in the fastest way possible
+ */
+ public interface TextReaderShim {
+ /**
+ * @param txt
+ * @param size
+ * @return bytes read
+ * @throws IOException
+ */
+ void read(Text txt, int size) throws IOException;
+ }
+
+ /**
+ * Wrap a TextReaderShim around an input stream. The reader shim will not
+ * buffer any reads from the underlying stream and will only consume bytes
+ * which are required for TextReaderShim.read() input.
+ */
+ public TextReaderShim getTextReaderShim(InputStream input) throws IOException;
+
+ class Factory {
+ private static HadoopShims SHIMS = null;
+
+ public static synchronized HadoopShims get() {
+ if (SHIMS == null) {
+ String[] versionParts = VersionInfo.getVersion().split("[.]");
+ int major = Integer.parseInt(versionParts[0]);
+ int minor = Integer.parseInt(versionParts[1]);
+ if (major < 2 || (major == 2 && minor < 3)) {
+ SHIMS = new HadoopShims_2_2();
+ } else {
+ SHIMS = new HadoopShimsCurrent();
+ }
+ }
+ return SHIMS;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
new file mode 100644
index 0000000..5c53f74
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Shims for recent versions of Hadoop
+ */
+public class HadoopShimsCurrent implements HadoopShims {
+
+ private static class DirectDecompressWrapper implements DirectDecompressor {
+ private final org.apache.hadoop.io.compress.DirectDecompressor root;
+
+ DirectDecompressWrapper(org.apache.hadoop.io.compress.DirectDecompressor root) {
+ this.root = root;
+ }
+
+ public void decompress(ByteBuffer input,
+ ByteBuffer output) throws IOException {
+ root.decompress(input, output);
+ }
+ }
+
+ public DirectDecompressor getDirectDecompressor(
+ DirectCompressionType codec) {
+ switch (codec) {
+ case ZLIB:
+ return new DirectDecompressWrapper
+ (new ZlibDecompressor.ZlibDirectDecompressor());
+ case ZLIB_NOHEADER:
+ return new DirectDecompressWrapper
+ (new ZlibDecompressor.ZlibDirectDecompressor
+ (ZlibDecompressor.CompressionHeader.NO_HEADER, 0));
+ case SNAPPY:
+ return new DirectDecompressWrapper
+ (new SnappyDecompressor.SnappyDirectDecompressor());
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+ ByteBufferPoolShim pool
+ ) throws IOException {
+ return ZeroCopyShims.getZeroCopyReader(in, pool);
+ }
+
+ private final class FastTextReaderShim implements TextReaderShim {
+ private final DataInputStream din;
+
+ public FastTextReaderShim(InputStream in) {
+ this.din = new DataInputStream(in);
+ }
+
+ @Override
+ public void read(Text txt, int len) throws IOException {
+ txt.readWithKnownLength(din, len);
+ }
+ }
+
+ @Override
+ public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
+ return new FastTextReaderShim(in);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java b/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java
new file mode 100644
index 0000000..3f65e74
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.Text;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+
+/**
+ * Shims for versions of Hadoop up to and including 2.2.x
+ */
+public class HadoopShims_2_2 implements HadoopShims {
+
+ final boolean zeroCopy;
+ final boolean fastRead;
+
+ HadoopShims_2_2() {
+ boolean zcr = false;
+ try {
+ Class.forName("org.apache.hadoop.fs.CacheFlag", false,
+ HadoopShims_2_2.class.getClassLoader());
+ zcr = true;
+ } catch (ClassNotFoundException ce) {
+ }
+ zeroCopy = zcr;
+ boolean fastRead = false;
+ if (zcr) {
+ for (Method m : Text.class.getMethods()) {
+ if ("readWithKnownLength".equals(m.getName())) {
+ fastRead = true;
+ }
+ }
+ }
+ this.fastRead = fastRead;
+ }
+
+ public DirectDecompressor getDirectDecompressor(
+ DirectCompressionType codec) {
+ return null;
+ }
+
+ @Override
+ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
+ ByteBufferPoolShim pool
+ ) throws IOException {
+ if(zeroCopy) {
+ return ZeroCopyShims.getZeroCopyReader(in, pool);
+ }
+ /* not supported */
+ return null;
+ }
+
+ private final class BasicTextReaderShim implements TextReaderShim {
+ private final InputStream in;
+
+ public BasicTextReaderShim(InputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public void read(Text txt, int len) throws IOException {
+ int offset = 0;
+ byte[] bytes = new byte[len];
+ while (len > 0) {
+ int written = in.read(bytes, offset, len);
+ if (written < 0) {
+ throw new EOFException("Can't finish read from " + in + " read "
+ + (offset) + " bytes out of " + bytes.length);
+ }
+ len -= written;
+ offset += written;
+ }
+ txt.set(bytes);
+ }
+ }
+
+ @Override
+ public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
+ return new BasicTextReaderShim(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
new file mode 100644
index 0000000..851f645
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.orc.CompressionCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.common.io.DiskRange;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.CodedInputStream;
+
+public abstract class InStream extends InputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
+ public static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
+
+ protected final String name;
+ protected long length;
+
+ public InStream(String name, long length) {
+ this.name = name;
+ this.length = length;
+ }
+
+ public String getStreamName() {
+ return name;
+ }
+
+ public long getStreamLength() {
+ return length;
+ }
+
+ @Override
+ public abstract void close();
+
+ public static class UncompressedStream extends InStream {
+ private List<DiskRange> bytes;
+ private long length;
+ protected long currentOffset;
+ private ByteBuffer range;
+ private int currentRange;
+
+ public UncompressedStream(String name, List<DiskRange> input, long length) {
+ super(name, length);
+ reset(input, length);
+ }
+
+ protected void reset(List<DiskRange> input, long length) {
+ this.bytes = input;
+ this.length = length;
+ currentRange = 0;
+ currentOffset = 0;
+ range = null;
+ }
+
+ @Override
+ public int read() {
+ if (range == null || range.remaining() == 0) {
+ if (currentOffset == length) {
+ return -1;
+ }
+ seek(currentOffset);
+ }
+ currentOffset += 1;
+ return 0xff & range.get();
+ }
+
+ @Override
+ public int read(byte[] data, int offset, int length) {
+ if (range == null || range.remaining() == 0) {
+ if (currentOffset == this.length) {
+ return -1;
+ }
+ seek(currentOffset);
+ }
+ int actualLength = Math.min(length, range.remaining());
+ range.get(data, offset, actualLength);
+ currentOffset += actualLength;
+ return actualLength;
+ }
+
+ @Override
+ public int available() {
+ if (range != null && range.remaining() > 0) {
+ return range.remaining();
+ }
+ return (int) (length - currentOffset);
+ }
+
+ @Override
+ public void close() {
+ currentRange = bytes.size();
+ currentOffset = length;
+ // explicit de-ref of bytes[]
+ bytes.clear();
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ seek(index.getNext());
+ }
+
+ public void seek(long desired) {
+ if (desired == 0 && bytes.isEmpty()) {
+ logEmptySeek(name);
+ return;
+ }
+ int i = 0;
+ for (DiskRange curRange : bytes) {
+ if (desired == 0 && curRange.getData().remaining() == 0) {
+ logEmptySeek(name);
+ return;
+ }
+ if (curRange.getOffset() <= desired &&
+ (desired - curRange.getOffset()) < curRange.getLength()) {
+ currentOffset = desired;
+ currentRange = i;
+ this.range = curRange.getData().duplicate();
+ int pos = range.position();
+ pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
+ this.range.position(pos);
+ return;
+ }
+ ++i;
+ }
+ // if they are seeking to the precise end, go ahead and let them go there
+ int segments = bytes.size();
+ if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+ currentOffset = desired;
+ currentRange = segments - 1;
+ DiskRange curRange = bytes.get(currentRange);
+ this.range = curRange.getData().duplicate();
+ int pos = range.position();
+ pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
+ this.range.position(pos);
+ return;
+ }
+ throw new IllegalArgumentException("Seek in " + name + " to " +
+ desired + " is outside of the data");
+ }
+
+ @Override
+ public String toString() {
+ return "uncompressed stream " + name + " position: " + currentOffset +
+ " length: " + length + " range: " + currentRange +
+ " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
+ }
+ }
+
+ private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
+ // TODO: use the same pool as the ORC readers
+ if (isDirect) {
+ return ByteBuffer.allocateDirect(size);
+ } else {
+ return ByteBuffer.allocate(size);
+ }
+ }
+
+ private static class CompressedStream extends InStream {
+ private final List<DiskRange> bytes;
+ private final int bufferSize;
+ private ByteBuffer uncompressed;
+ private final CompressionCodec codec;
+ private ByteBuffer compressed;
+ private long currentOffset;
+ private int currentRange;
+ private boolean isUncompressedOriginal;
+
+ public CompressedStream(String name, List<DiskRange> input, long length,
+ CompressionCodec codec, int bufferSize) {
+ super(name, length);
+ this.bytes = input;
+ this.codec = codec;
+ this.bufferSize = bufferSize;
+ currentOffset = 0;
+ currentRange = 0;
+ }
+
+ private void allocateForUncompressed(int size, boolean isDirect) {
+ uncompressed = allocateBuffer(size, isDirect);
+ }
+
+ private void readHeader() throws IOException {
+ if (compressed == null || compressed.remaining() <= 0) {
+ seek(currentOffset);
+ }
+ if (compressed.remaining() > OutStream.HEADER_SIZE) {
+ int b0 = compressed.get() & 0xff;
+ int b1 = compressed.get() & 0xff;
+ int b2 = compressed.get() & 0xff;
+ boolean isOriginal = (b0 & 0x01) == 1;
+ int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+
+ if (chunkLength > bufferSize) {
+ throw new IllegalArgumentException("Buffer size too small. size = " +
+ bufferSize + " needed = " + chunkLength);
+ }
+ // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
+ assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
+ currentOffset += OutStream.HEADER_SIZE;
+
+ ByteBuffer slice = this.slice(chunkLength);
+
+ if (isOriginal) {
+ uncompressed = slice;
+ isUncompressedOriginal = true;
+ } else {
+ if (isUncompressedOriginal) {
+ allocateForUncompressed(bufferSize, slice.isDirect());
+ isUncompressedOriginal = false;
+ } else if (uncompressed == null) {
+ allocateForUncompressed(bufferSize, slice.isDirect());
+ } else {
+ uncompressed.clear();
+ }
+ codec.decompress(slice, uncompressed);
+ }
+ } else {
+ throw new IllegalStateException("Can't read header at " + this);
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (uncompressed == null || uncompressed.remaining() == 0) {
+ if (currentOffset == length) {
+ return -1;
+ }
+ readHeader();
+ }
+ return 0xff & uncompressed.get();
+ }
+
+ @Override
+ public int read(byte[] data, int offset, int length) throws IOException {
+ if (uncompressed == null || uncompressed.remaining() == 0) {
+ if (currentOffset == this.length) {
+ return -1;
+ }
+ readHeader();
+ }
+ int actualLength = Math.min(length, uncompressed.remaining());
+ uncompressed.get(data, offset, actualLength);
+ return actualLength;
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (uncompressed == null || uncompressed.remaining() == 0) {
+ if (currentOffset == length) {
+ return 0;
+ }
+ readHeader();
+ }
+ return uncompressed.remaining();
+ }
+
+ @Override
+ public void close() {
+ uncompressed = null;
+ compressed = null;
+ currentRange = bytes.size();
+ currentOffset = length;
+ bytes.clear();
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ seek(index.getNext());
+ long uncompressedBytes = index.getNext();
+ if (uncompressedBytes != 0) {
+ readHeader();
+ uncompressed.position(uncompressed.position() +
+ (int) uncompressedBytes);
+ } else if (uncompressed != null) {
+ // mark the uncompressed buffer as done
+ uncompressed.position(uncompressed.limit());
+ }
+ }
+
+ /* slices a read only contiguous buffer of chunkLength */
+ private ByteBuffer slice(int chunkLength) throws IOException {
+ int len = chunkLength;
+ final long oldOffset = currentOffset;
+ ByteBuffer slice;
+ if (compressed.remaining() >= len) {
+ slice = compressed.slice();
+ // simple case
+ slice.limit(len);
+ currentOffset += len;
+ compressed.position(compressed.position() + len);
+ return slice;
+ } else if (currentRange >= (bytes.size() - 1)) {
+ // nothing has been modified yet
+ throw new IOException("EOF in " + this + " while trying to read " +
+ chunkLength + " bytes");
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)",
+ compressed.remaining(), len));
+ }
+
+ // we need to consolidate 2 or more buffers into 1
+ // first copy out compressed buffers
+ ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+ currentOffset += compressed.remaining();
+ len -= compressed.remaining();
+ copy.put(compressed);
+ ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
+
+ while (len > 0 && iter.hasNext()) {
+ ++currentRange;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
+ }
+ DiskRange range = iter.next();
+ compressed = range.getData().duplicate();
+ if (compressed.remaining() >= len) {
+ slice = compressed.slice();
+ slice.limit(len);
+ copy.put(slice);
+ currentOffset += len;
+ compressed.position(compressed.position() + len);
+ return copy;
+ }
+ currentOffset += compressed.remaining();
+ len -= compressed.remaining();
+ copy.put(compressed);
+ }
+
+ // restore offsets for exception clarity
+ seek(oldOffset);
+ throw new IOException("EOF in " + this + " while trying to read " +
+ chunkLength + " bytes");
+ }
+
+ private void seek(long desired) throws IOException {
+ if (desired == 0 && bytes.isEmpty()) {
+ logEmptySeek(name);
+ return;
+ }
+ int i = 0;
+ for (DiskRange range : bytes) {
+ if (range.getOffset() <= desired && desired < range.getEnd()) {
+ currentRange = i;
+ compressed = range.getData().duplicate();
+ int pos = compressed.position();
+ pos += (int)(desired - range.getOffset());
+ compressed.position(pos);
+ currentOffset = desired;
+ return;
+ }
+ ++i;
+ }
+ // if they are seeking to the precise end, go ahead and let them go there
+ int segments = bytes.size();
+ if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
+ DiskRange range = bytes.get(segments - 1);
+ currentRange = segments - 1;
+ compressed = range.getData().duplicate();
+ compressed.position(compressed.limit());
+ currentOffset = desired;
+ return;
+ }
+ throw new IOException("Seek outside of data in " + this + " to " + desired);
+ }
+
+ private String rangeString() {
+ StringBuilder builder = new StringBuilder();
+ int i = 0;
+ for (DiskRange range : bytes) {
+ if (i != 0) {
+ builder.append("; ");
+ }
+ builder.append(" range " + i + " = " + range.getOffset()
+ + " to " + (range.getEnd() - range.getOffset()));
+ ++i;
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public String toString() {
+ return "compressed stream " + name + " position: " + currentOffset +
+ " length: " + length + " range: " + currentRange +
+ " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
+ rangeString() +
+ (uncompressed == null ? "" :
+ " uncompressed: " + uncompressed.position() + " to " +
+ uncompressed.limit());
+ }
+ }
+
+ public abstract void seek(PositionProvider index) throws IOException;
+
+ private static void logEmptySeek(String name) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream.");
+ }
+ }
+
+ /**
+ * Create an input stream from a list of buffers.
+ * @param streamName the name of the stream
+ * @param buffers the list of ranges of bytes for the stream
+ * @param offsets a list of offsets (the same length as input) that must
+ * contain the first offset of the each set of bytes in input
+ * @param length the length in bytes of the stream
+ * @param codec the compression codec
+ * @param bufferSize the compression buffer size
+ * @return an input stream
+ * @throws IOException
+ */
+ @VisibleForTesting
+ @Deprecated
+ public static InStream create(String streamName,
+ ByteBuffer[] buffers,
+ long[] offsets,
+ long length,
+ CompressionCodec codec,
+ int bufferSize) throws IOException {
+ List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
+ for (int i = 0; i < buffers.length; ++i) {
+ input.add(new BufferChunk(buffers[i], offsets[i]));
+ }
+ return create(streamName, input, length, codec, bufferSize);
+ }
+
+ /**
+ * Create an input stream from a list of disk ranges with data.
+ * @param name the name of the stream
+ * @param input the list of ranges of bytes for the stream; from disk or cache
+ * @param length the length in bytes of the stream
+ * @param codec the compression codec
+ * @param bufferSize the compression buffer size
+ * @return an input stream
+ * @throws IOException
+ */
+ public static InStream create(String name,
+ List<DiskRange> input,
+ long length,
+ CompressionCodec codec,
+ int bufferSize) throws IOException {
+ if (codec == null) {
+ return new UncompressedStream(name, input, length);
+ } else {
+ return new CompressedStream(name, input, length, codec, bufferSize);
+ }
+ }
+
+ /**
+ * Creates coded input stream (used for protobuf message parsing) with higher message size limit.
+ *
+ * @param name the name of the stream
+ * @param input the list of ranges of bytes for the stream; from disk or cache
+ * @param length the length in bytes of the stream
+ * @param codec the compression codec
+ * @param bufferSize the compression buffer size
+ * @return coded input stream
+ * @throws IOException
+ */
+ public static CodedInputStream createCodedInputStream(
+ String name,
+ List<DiskRange> input,
+ long length,
+ CompressionCodec codec,
+ int bufferSize) throws IOException {
+ InStream inStream = create(name, input, length, codec, bufferSize);
+ CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream);
+ codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT);
+ return codedInputStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/IntegerReader.java b/java/core/src/java/org/apache/orc/impl/IntegerReader.java
new file mode 100644
index 0000000..3e64d54
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/IntegerReader.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+
+/**
+ * Interface for reading integers.
+ */
+public interface IntegerReader {
+
+ /**
+ * Seek to the position provided by index.
+ * @param index
+ * @throws IOException
+ */
+ void seek(PositionProvider index) throws IOException;
+
+ /**
+ * Skip number of specified rows.
+ * @param numValues
+ * @throws IOException
+ */
+ void skip(long numValues) throws IOException;
+
+ /**
+ * Check if there are any more values left.
+ * @return
+ * @throws IOException
+ */
+ boolean hasNext() throws IOException;
+
+ /**
+ * Return the next available value.
+ * @return
+ * @throws IOException
+ */
+ long next() throws IOException;
+
+ /**
+ * Return the next available vector for values.
+ * @param column the column being read
+ * @param data the vector to read into
+ * @param length the number of numbers to read
+ * @throws IOException
+ */
+ void nextVector(ColumnVector column,
+ long[] data,
+ int length
+ ) throws IOException;
+
+ /**
+ * Return the next available vector for values. Does not change the
+ * value of column.isRepeating.
+ * @param column the column being read
+ * @param data the vector to read into
+ * @param length the number of numbers to read
+ * @throws IOException
+ */
+ void nextVector(ColumnVector column,
+ int[] data,
+ int length
+ ) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/IntegerWriter.java b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
new file mode 100644
index 0000000..419054f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * Interface for writing integers.
+ */
+public interface IntegerWriter {
+
+ /**
+ * Get position from the stream.
+ * @param recorder
+ * @throws IOException
+ */
+ void getPosition(PositionRecorder recorder) throws IOException;
+
+ /**
+ * Write the integer value
+ * @param value
+ * @throws IOException
+ */
+ void write(long value) throws IOException;
+
+ /**
+ * Flush the buffer
+ * @throws IOException
+ */
+ void flush() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/MemoryManager.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/MemoryManager.java b/java/core/src/java/org/apache/orc/impl/MemoryManager.java
new file mode 100644
index 0000000..757c0b4
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/MemoryManager.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.OrcConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Implements a memory manager that keeps a global context of how many ORC
+ * writers there are and manages the memory between them. For use cases with
+ * dynamic partitions, it is easy to end up with many writers in the same task.
+ * By managing the size of each allocation, we try to cut down the size of each
+ * allocation and keep the task from running out of memory.
+ *
+ * This class is not thread safe, but is re-entrant - ensure creation and all
+ * invocations are triggered from the same thread.
+ */
+public class MemoryManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class);
+
+ /**
+ * How often should we check the memory sizes? Measured in rows added
+ * to all of the writers.
+ */
+ private static final int ROWS_BETWEEN_CHECKS = 5000;
+ private final long totalMemoryPool;
+ private final Map<Path, WriterInfo> writerList =
+ new HashMap<Path, WriterInfo>();
+ private long totalAllocation = 0;
+ private double currentScale = 1;
+ private int rowsAddedSinceCheck = 0;
+ private final OwnedLock ownerLock = new OwnedLock();
+
+ @SuppressWarnings("serial")
+ private static class OwnedLock extends ReentrantLock {
+ public Thread getOwner() {
+ return super.getOwner();
+ }
+ }
+
+ private static class WriterInfo {
+ long allocation;
+ Callback callback;
+ WriterInfo(long allocation, Callback callback) {
+ this.allocation = allocation;
+ this.callback = callback;
+ }
+ }
+
+ public interface Callback {
+ /**
+ * The writer needs to check its memory usage
+ * @param newScale the current scale factor for memory allocations
+ * @return true if the writer was over the limit
+ * @throws IOException
+ */
+ boolean checkMemory(double newScale) throws IOException;
+ }
+
+ /**
+ * Create the memory manager.
+ * @param conf use the configuration to find the maximum size of the memory
+ * pool.
+ */
+ public MemoryManager(Configuration conf) {
+ double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
+ totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
+ getHeapMemoryUsage().getMax() * maxLoad);
+ ownerLock.lock();
+ }
+
+ /**
+ * Light weight thread-safety check for multi-threaded access patterns
+ */
+ private void checkOwner() {
+ if (!ownerLock.isHeldByCurrentThread()) {
+ LOG.warn("Owner thread expected {}, got {}",
+ ownerLock.getOwner(), Thread.currentThread());
+ }
+ }
+
+ /**
+ * Add a new writer's memory allocation to the pool. We use the path
+ * as a unique key to ensure that we don't get duplicates.
+ * @param path the file that is being written
+ * @param requestedAllocation the requested buffer size
+ */
+ public void addWriter(Path path, long requestedAllocation,
+ Callback callback) throws IOException {
+ checkOwner();
+ WriterInfo oldVal = writerList.get(path);
+ // this should always be null, but we handle the case where the memory
+ // manager wasn't told that a writer wasn't still in use and the task
+ // starts writing to the same path.
+ if (oldVal == null) {
+ oldVal = new WriterInfo(requestedAllocation, callback);
+ writerList.put(path, oldVal);
+ totalAllocation += requestedAllocation;
+ } else {
+ // handle a new writer that is writing to the same path
+ totalAllocation += requestedAllocation - oldVal.allocation;
+ oldVal.allocation = requestedAllocation;
+ oldVal.callback = callback;
+ }
+ updateScale(true);
+ }
+
+ /**
+ * Remove the given writer from the pool.
+ * @param path the file that has been closed
+ */
+ public void removeWriter(Path path) throws IOException {
+ checkOwner();
+ WriterInfo val = writerList.get(path);
+ if (val != null) {
+ writerList.remove(path);
+ totalAllocation -= val.allocation;
+ if (writerList.isEmpty()) {
+ rowsAddedSinceCheck = 0;
+ }
+ updateScale(false);
+ }
+ if(writerList.isEmpty()) {
+ rowsAddedSinceCheck = 0;
+ }
+ }
+
+ /**
+ * Get the total pool size that is available for ORC writers.
+ * @return the number of bytes in the pool
+ */
+ public long getTotalMemoryPool() {
+ return totalMemoryPool;
+ }
+
+ /**
+ * The scaling factor for each allocation to ensure that the pool isn't
+ * oversubscribed.
+ * @return a fraction between 0.0 and 1.0 of the requested size that is
+ * available for each writer.
+ */
+ public double getAllocationScale() {
+ return currentScale;
+ }
+
+ /**
+ * Give the memory manager an opportunity for doing a memory check.
+ * @param rows number of rows added
+ * @throws IOException
+ */
+ public void addedRow(int rows) throws IOException {
+ rowsAddedSinceCheck += rows;
+ if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
+ notifyWriters();
+ }
+ }
+
+ /**
+ * Notify all of the writers that they should check their memory usage.
+ * @throws IOException
+ */
+ public void notifyWriters() throws IOException {
+ checkOwner();
+ LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
+ for(WriterInfo writer: writerList.values()) {
+ boolean flushed = writer.callback.checkMemory(currentScale);
+ if (LOG.isDebugEnabled() && flushed) {
+ LOG.debug("flushed " + writer.toString());
+ }
+ }
+ rowsAddedSinceCheck = 0;
+ }
+
+ /**
+ * Update the currentScale based on the current allocation and pool size.
+ * This also updates the notificationTrigger.
+ * @param isAllocate is this an allocation?
+ */
+ private void updateScale(boolean isAllocate) throws IOException {
+ if (totalAllocation <= totalMemoryPool) {
+ currentScale = 1;
+ } else {
+ currentScale = (double) totalMemoryPool / totalAllocation;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java b/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java
new file mode 100644
index 0000000..72c7f54
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.Reader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+
+public class OrcAcidUtils {
+ public static final String ACID_STATS = "hive.acid.stats";
+ public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
+
+ /**
+ * Get the filename of the ORC ACID side file that contains the lengths
+ * of the intermediate footers.
+ * @param main the main ORC filename
+ * @return the name of the side file
+ */
+ public static Path getSideFile(Path main) {
+ return new Path(main + DELTA_SIDE_FILE_SUFFIX);
+ }
+
+ /**
+ * Read the side file to get the last flush length.
+ * @param fs the file system to use
+ * @param deltaFile the path of the delta file
+ * @return the maximum size of the file to use
+ * @throws IOException
+ */
+ public static long getLastFlushLength(FileSystem fs,
+ Path deltaFile) throws IOException {
+ Path lengths = getSideFile(deltaFile);
+ long result = Long.MAX_VALUE;
+ try (FSDataInputStream stream = fs.open(lengths)) {
+ result = -1;
+ while (stream.available() > 0) {
+ result = stream.readLong();
+ }
+ return result;
+ } catch (IOException ioe) {
+ return result;
+ }
+ }
+
+ private static final Charset utf8 = Charset.forName("UTF-8");
+ private static final CharsetDecoder utf8Decoder = utf8.newDecoder();
+
+ public static AcidStats parseAcidStats(Reader reader) {
+ if (reader.hasMetadataValue(ACID_STATS)) {
+ try {
+ ByteBuffer val = reader.getMetadataValue(ACID_STATS).duplicate();
+ return new AcidStats(utf8Decoder.decode(val).toString());
+ } catch (CharacterCodingException e) {
+ throw new IllegalArgumentException("Bad string encoding for " +
+ ACID_STATS, e);
+ }
+ } else {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/OrcIndex.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcIndex.java b/java/core/src/java/org/apache/orc/impl/OrcIndex.java
new file mode 100644
index 0000000..50a15f2
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/OrcIndex.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.OrcProto;
+
+public final class OrcIndex {
+ OrcProto.RowIndex[] rowGroupIndex;
+ OrcProto.BloomFilterIndex[] bloomFilterIndex;
+
+ public OrcIndex(OrcProto.RowIndex[] rgIndex, OrcProto.BloomFilterIndex[] bfIndex) {
+ this.rowGroupIndex = rgIndex;
+ this.bloomFilterIndex = bfIndex;
+ }
+
+ public OrcProto.RowIndex[] getRowGroupIndex() {
+ return rowGroupIndex;
+ }
+
+ public OrcProto.BloomFilterIndex[] getBloomFilterIndex() {
+ return bloomFilterIndex;
+ }
+
+ public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) {
+ this.rowGroupIndex = rowGroupIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java
new file mode 100644
index 0000000..81662cc
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import org.apache.orc.CompressionCodec;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class OutStream extends PositionedOutputStream {
+
+ public interface OutputReceiver {
+ /**
+ * Output the given buffer to the final destination
+ * @param buffer the buffer to output
+ * @throws IOException
+ */
+ void output(ByteBuffer buffer) throws IOException;
+ }
+
+ public static final int HEADER_SIZE = 3;
+ private final String name;
+ private final OutputReceiver receiver;
+ // if enabled the stream will be suppressed when writing stripe
+ private boolean suppress;
+
+ /**
+ * Stores the uncompressed bytes that have been serialized, but not
+ * compressed yet. When this fills, we compress the entire buffer.
+ */
+ private ByteBuffer current = null;
+
+ /**
+ * Stores the compressed bytes until we have a full buffer and then outputs
+ * them to the receiver. If no compression is being done, this (and overflow)
+ * will always be null and the current buffer will be sent directly to the
+ * receiver.
+ */
+ private ByteBuffer compressed = null;
+
+ /**
+ * Since the compressed buffer may start with contents from previous
+ * compression blocks, we allocate an overflow buffer so that the
+ * output of the codec can be split between the two buffers. After the
+ * compressed buffer is sent to the receiver, the overflow buffer becomes
+ * the new compressed buffer.
+ */
+ private ByteBuffer overflow = null;
+ private final int bufferSize;
+ private final CompressionCodec codec;
+ private long compressedBytes = 0;
+ private long uncompressedBytes = 0;
+
+ public OutStream(String name,
+ int bufferSize,
+ CompressionCodec codec,
+ OutputReceiver receiver) throws IOException {
+ this.name = name;
+ this.bufferSize = bufferSize;
+ this.codec = codec;
+ this.receiver = receiver;
+ this.suppress = false;
+ }
+
+ public void clear() throws IOException {
+ flush();
+ suppress = false;
+ }
+
+ /**
+ * Write the length of the compressed bytes. Life is much easier if the
+ * header is constant length, so just use 3 bytes. Considering most of the
+ * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should
+ * be plenty. We also use the low bit for whether it is the original or
+ * compressed bytes.
+ * @param buffer the buffer to write the header to
+ * @param position the position in the buffer to write at
+ * @param val the size in the file
+ * @param original is it uncompressed
+ */
+ private static void writeHeader(ByteBuffer buffer,
+ int position,
+ int val,
+ boolean original) {
+ buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
+ buffer.put(position + 1, (byte) (val >> 7));
+ buffer.put(position + 2, (byte) (val >> 15));
+ }
+
+ private void getNewInputBuffer() throws IOException {
+ if (codec == null) {
+ current = ByteBuffer.allocate(bufferSize);
+ } else {
+ current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+ writeHeader(current, 0, bufferSize, true);
+ current.position(HEADER_SIZE);
+ }
+ }
+
+ /**
+ * Allocate a new output buffer if we are compressing.
+ */
+ private ByteBuffer getNewOutputBuffer() throws IOException {
+ return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
+ }
+
+ private void flip() throws IOException {
+ current.limit(current.position());
+ current.position(codec == null ? 0 : HEADER_SIZE);
+ }
+
+ @Override
+ public void write(int i) throws IOException {
+ if (current == null) {
+ getNewInputBuffer();
+ }
+ if (current.remaining() < 1) {
+ spill();
+ }
+ uncompressedBytes += 1;
+ current.put((byte) i);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int length) throws IOException {
+ if (current == null) {
+ getNewInputBuffer();
+ }
+ int remaining = Math.min(current.remaining(), length);
+ current.put(bytes, offset, remaining);
+ uncompressedBytes += remaining;
+ length -= remaining;
+ while (length != 0) {
+ spill();
+ offset += remaining;
+ remaining = Math.min(current.remaining(), length);
+ current.put(bytes, offset, remaining);
+ uncompressedBytes += remaining;
+ length -= remaining;
+ }
+ }
+
+ private void spill() throws java.io.IOException {
+ // if there isn't anything in the current buffer, don't spill
+ if (current == null ||
+ current.position() == (codec == null ? 0 : HEADER_SIZE)) {
+ return;
+ }
+ flip();
+ if (codec == null) {
+ receiver.output(current);
+ getNewInputBuffer();
+ } else {
+ if (compressed == null) {
+ compressed = getNewOutputBuffer();
+ } else if (overflow == null) {
+ overflow = getNewOutputBuffer();
+ }
+ int sizePosn = compressed.position();
+ compressed.position(compressed.position() + HEADER_SIZE);
+ if (codec.compress(current, compressed, overflow)) {
+ uncompressedBytes = 0;
+ // move position back to after the header
+ current.position(HEADER_SIZE);
+ current.limit(current.capacity());
+ // find the total bytes in the chunk
+ int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
+ if (overflow != null) {
+ totalBytes += overflow.position();
+ }
+ compressedBytes += totalBytes + HEADER_SIZE;
+ writeHeader(compressed, sizePosn, totalBytes, false);
+ // if we have less than the next header left, spill it.
+ if (compressed.remaining() < HEADER_SIZE) {
+ compressed.flip();
+ receiver.output(compressed);
+ compressed = overflow;
+ overflow = null;
+ }
+ } else {
+ compressedBytes += uncompressedBytes + HEADER_SIZE;
+ uncompressedBytes = 0;
+ // we are using the original, but need to spill the current
+ // compressed buffer first. So back up to where we started,
+ // flip it and add it to done.
+ if (sizePosn != 0) {
+ compressed.position(sizePosn);
+ compressed.flip();
+ receiver.output(compressed);
+ compressed = null;
+ // if we have an overflow, clear it and make it the new compress
+ // buffer
+ if (overflow != null) {
+ overflow.clear();
+ compressed = overflow;
+ overflow = null;
+ }
+ } else {
+ compressed.clear();
+ if (overflow != null) {
+ overflow.clear();
+ }
+ }
+
+ // now add the current buffer into the done list and get a new one.
+ current.position(0);
+ // update the header with the current length
+ writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
+ receiver.output(current);
+ getNewInputBuffer();
+ }
+ }
+ }
+
+ @Override
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ if (codec == null) {
+ recorder.addPosition(uncompressedBytes);
+ } else {
+ recorder.addPosition(compressedBytes);
+ recorder.addPosition(uncompressedBytes);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ spill();
+ if (compressed != null && compressed.position() != 0) {
+ compressed.flip();
+ receiver.output(compressed);
+ }
+ compressed = null;
+ uncompressedBytes = 0;
+ compressedBytes = 0;
+ overflow = null;
+ current = null;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ @Override
+ public long getBufferSize() {
+ long result = 0;
+ if (current != null) {
+ result += current.capacity();
+ }
+ if (compressed != null) {
+ result += compressed.capacity();
+ }
+ if (overflow != null) {
+ result += overflow.capacity();
+ }
+ return result;
+ }
+
+ /**
+ * Set suppress flag
+ */
+ public void suppress() {
+ suppress = true;
+ }
+
+ /**
+ * Returns the state of suppress flag
+ * @return value of suppress flag
+ */
+ public boolean isSuppressed() {
+ return suppress;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/PositionProvider.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PositionProvider.java b/java/core/src/java/org/apache/orc/impl/PositionProvider.java
new file mode 100644
index 0000000..47cf481
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/PositionProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+/**
+ * An interface used for seeking to a row index.
+ */
+public interface PositionProvider {
+ long getNext();
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/PositionRecorder.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PositionRecorder.java b/java/core/src/java/org/apache/orc/impl/PositionRecorder.java
new file mode 100644
index 0000000..1fff760
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/PositionRecorder.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+/**
+ * An interface for recording positions in a stream.
+ */
+public interface PositionRecorder {
+ void addPosition(long offset);
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java b/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java
new file mode 100644
index 0000000..d412939
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class PositionedOutputStream extends OutputStream {
+
+ /**
+ * Record the current position to the recorder.
+ * @param recorder the object that receives the position
+ * @throws IOException
+ */
+ public abstract void getPosition(PositionRecorder recorder
+ ) throws IOException;
+
+ /**
+ * Get the memory size currently allocated as buffer associated with this
+ * stream.
+ * @return the number of bytes used by buffers.
+ */
+ public abstract long getBufferSize();
+}