You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2017/02/03 16:38:16 UTC
[17/22] hive git commit: HIVE-14007. Replace hive-orc module with ORC
1.3.1
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
deleted file mode 100644
index 22301e8..0000000
--- a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.orc.CompressionKind;
-
-import javax.annotation.Nullable;
-
-public final class DataReaderProperties {
-
- private final FileSystem fileSystem;
- private final Path path;
- private final CompressionKind compression;
- private final boolean zeroCopy;
- private final int typeCount;
- private final int bufferSize;
-
- private DataReaderProperties(Builder builder) {
- this.fileSystem = builder.fileSystem;
- this.path = builder.path;
- this.compression = builder.compression;
- this.zeroCopy = builder.zeroCopy;
- this.typeCount = builder.typeCount;
- this.bufferSize = builder.bufferSize;
- }
-
- public FileSystem getFileSystem() {
- return fileSystem;
- }
-
- public Path getPath() {
- return path;
- }
-
- public CompressionKind getCompression() {
- return compression;
- }
-
- public boolean getZeroCopy() {
- return zeroCopy;
- }
-
- public int getTypeCount() {
- return typeCount;
- }
-
- public int getBufferSize() {
- return bufferSize;
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static class Builder {
-
- private FileSystem fileSystem;
- private Path path;
- private CompressionKind compression;
- private boolean zeroCopy;
- private int typeCount;
- private int bufferSize;
-
- private Builder() {
-
- }
-
- public Builder withFileSystem(FileSystem fileSystem) {
- this.fileSystem = fileSystem;
- return this;
- }
-
- public Builder withPath(Path path) {
- this.path = path;
- return this;
- }
-
- public Builder withCompression(CompressionKind value) {
- this.compression = value;
- return this;
- }
-
- public Builder withZeroCopy(boolean zeroCopy) {
- this.zeroCopy = zeroCopy;
- return this;
- }
-
- public Builder withTypeCount(int value) {
- this.typeCount = value;
- return this;
- }
-
- public Builder withBufferSize(int value) {
- this.bufferSize = value;
- return this;
- }
-
- public DataReaderProperties build() {
- Preconditions.checkNotNull(fileSystem);
- Preconditions.checkNotNull(path);
-
- return new DataReaderProperties(this);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java b/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
deleted file mode 100644
index 7e0110d..0000000
--- a/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import org.apache.orc.CompressionCodec;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public interface DirectDecompressionCodec extends CompressionCodec {
- public boolean isAvailable();
- public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DynamicByteArray.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DynamicByteArray.java b/orc/src/java/org/apache/orc/impl/DynamicByteArray.java
deleted file mode 100644
index 986c2ac..0000000
--- a/orc/src/java/org/apache/orc/impl/DynamicByteArray.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.io.Text;
-
-/**
- * A class that is a growable array of bytes. Growth is managed in terms of
- * chunks that are allocated when needed.
- */
-public final class DynamicByteArray {
- static final int DEFAULT_CHUNKSIZE = 32 * 1024;
- static final int DEFAULT_NUM_CHUNKS = 128;
-
- private final int chunkSize; // our allocation sizes
- private byte[][] data; // the real data
- private int length; // max set element index +1
- private int initializedChunks = 0; // the number of chunks created
-
- public DynamicByteArray() {
- this(DEFAULT_NUM_CHUNKS, DEFAULT_CHUNKSIZE);
- }
-
- public DynamicByteArray(int numChunks, int chunkSize) {
- if (chunkSize == 0) {
- throw new IllegalArgumentException("bad chunksize");
- }
- this.chunkSize = chunkSize;
- data = new byte[numChunks][];
- }
-
- /**
- * Ensure that the given index is valid.
- */
- private void grow(int chunkIndex) {
- if (chunkIndex >= initializedChunks) {
- if (chunkIndex >= data.length) {
- int newSize = Math.max(chunkIndex + 1, 2 * data.length);
- byte[][] newChunk = new byte[newSize][];
- System.arraycopy(data, 0, newChunk, 0, data.length);
- data = newChunk;
- }
- for(int i=initializedChunks; i <= chunkIndex; ++i) {
- data[i] = new byte[chunkSize];
- }
- initializedChunks = chunkIndex + 1;
- }
- }
-
- public byte get(int index) {
- if (index >= length) {
- throw new IndexOutOfBoundsException("Index " + index +
- " is outside of 0.." +
- (length - 1));
- }
- int i = index / chunkSize;
- int j = index % chunkSize;
- return data[i][j];
- }
-
- public void set(int index, byte value) {
- int i = index / chunkSize;
- int j = index % chunkSize;
- grow(i);
- if (index >= length) {
- length = index + 1;
- }
- data[i][j] = value;
- }
-
- public int add(byte value) {
- int i = length / chunkSize;
- int j = length % chunkSize;
- grow(i);
- data[i][j] = value;
- int result = length;
- length += 1;
- return result;
- }
-
- /**
- * Copy a slice of a byte array into our buffer.
- * @param value the array to copy from
- * @param valueOffset the first location to copy from value
- * @param valueLength the number of bytes to copy from value
- * @return the offset of the start of the value
- */
- public int add(byte[] value, int valueOffset, int valueLength) {
- int i = length / chunkSize;
- int j = length % chunkSize;
- grow((length + valueLength) / chunkSize);
- int remaining = valueLength;
- while (remaining > 0) {
- int size = Math.min(remaining, chunkSize - j);
- System.arraycopy(value, valueOffset, data[i], j, size);
- remaining -= size;
- valueOffset += size;
- i += 1;
- j = 0;
- }
- int result = length;
- length += valueLength;
- return result;
- }
-
- /**
- * Read the entire stream into this array.
- * @param in the stream to read from
- * @throws IOException
- */
- public void readAll(InputStream in) throws IOException {
- int currentChunk = length / chunkSize;
- int currentOffset = length % chunkSize;
- grow(currentChunk);
- int currentLength = in.read(data[currentChunk], currentOffset,
- chunkSize - currentOffset);
- while (currentLength > 0) {
- length += currentLength;
- currentOffset = length % chunkSize;
- if (currentOffset == 0) {
- currentChunk = length / chunkSize;
- grow(currentChunk);
- }
- currentLength = in.read(data[currentChunk], currentOffset,
- chunkSize - currentOffset);
- }
- }
-
- /**
- * Byte compare a set of bytes against the bytes in this dynamic array.
- * @param other source of the other bytes
- * @param otherOffset start offset in the other array
- * @param otherLength number of bytes in the other array
- * @param ourOffset the offset in our array
- * @param ourLength the number of bytes in our array
- * @return negative for less, 0 for equal, positive for greater
- */
- public int compare(byte[] other, int otherOffset, int otherLength,
- int ourOffset, int ourLength) {
- int currentChunk = ourOffset / chunkSize;
- int currentOffset = ourOffset % chunkSize;
- int maxLength = Math.min(otherLength, ourLength);
- while (maxLength > 0 &&
- other[otherOffset] == data[currentChunk][currentOffset]) {
- otherOffset += 1;
- currentOffset += 1;
- if (currentOffset == chunkSize) {
- currentChunk += 1;
- currentOffset = 0;
- }
- maxLength -= 1;
- }
- if (maxLength == 0) {
- return otherLength - ourLength;
- }
- int otherByte = 0xff & other[otherOffset];
- int ourByte = 0xff & data[currentChunk][currentOffset];
- return otherByte > ourByte ? 1 : -1;
- }
-
- /**
- * Get the size of the array.
- * @return the number of bytes in the array
- */
- public int size() {
- return length;
- }
-
- /**
- * Clear the array to its original pristine state.
- */
- public void clear() {
- length = 0;
- for(int i=0; i < data.length; ++i) {
- data[i] = null;
- }
- initializedChunks = 0;
- }
-
- /**
- * Set a text value from the bytes in this dynamic array.
- * @param result the value to set
- * @param offset the start of the bytes to copy
- * @param length the number of bytes to copy
- */
- public void setText(Text result, int offset, int length) {
- result.clear();
- int currentChunk = offset / chunkSize;
- int currentOffset = offset % chunkSize;
- int currentLength = Math.min(length, chunkSize - currentOffset);
- while (length > 0) {
- result.append(data[currentChunk], currentOffset, currentLength);
- length -= currentLength;
- currentChunk += 1;
- currentOffset = 0;
- currentLength = Math.min(length, chunkSize - currentOffset);
- }
- }
-
- /**
- * Write out a range of this dynamic array to an output stream.
- * @param out the stream to write to
- * @param offset the first offset to write
- * @param length the number of bytes to write
- * @throws IOException
- */
- public void write(OutputStream out, int offset,
- int length) throws IOException {
- int currentChunk = offset / chunkSize;
- int currentOffset = offset % chunkSize;
- while (length > 0) {
- int currentLength = Math.min(length, chunkSize - currentOffset);
- out.write(data[currentChunk], currentOffset, currentLength);
- length -= currentLength;
- currentChunk += 1;
- currentOffset = 0;
- }
- }
-
- @Override
- public String toString() {
- int i;
- StringBuilder sb = new StringBuilder(length * 3);
-
- sb.append('{');
- int l = length - 1;
- for (i=0; i<l; i++) {
- sb.append(Integer.toHexString(get(i)));
- sb.append(',');
- }
- sb.append(get(i));
- sb.append('}');
-
- return sb.toString();
- }
-
- public void setByteBuffer(ByteBuffer result, int offset, int length) {
- result.clear();
- int currentChunk = offset / chunkSize;
- int currentOffset = offset % chunkSize;
- int currentLength = Math.min(length, chunkSize - currentOffset);
- while (length > 0) {
- result.put(data[currentChunk], currentOffset, currentLength);
- length -= currentLength;
- currentChunk += 1;
- currentOffset = 0;
- currentLength = Math.min(length, chunkSize - currentOffset);
- }
- }
-
- /**
- * Gets all the bytes of the array.
- *
- * @return Bytes of the array
- */
- public byte[] get() {
- byte[] result = null;
- if (length > 0) {
- int currentChunk = 0;
- int currentOffset = 0;
- int currentLength = Math.min(length, chunkSize);
- int destOffset = 0;
- result = new byte[length];
- int totalLength = length;
- while (totalLength > 0) {
- System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
- destOffset += currentLength;
- totalLength -= currentLength;
- currentChunk += 1;
- currentOffset = 0;
- currentLength = Math.min(totalLength, chunkSize - currentOffset);
- }
- }
- return result;
- }
-
- /**
- * Get the size of the buffers.
- */
- public long getSizeInBytes() {
- return initializedChunks * chunkSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DynamicIntArray.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/DynamicIntArray.java b/orc/src/java/org/apache/orc/impl/DynamicIntArray.java
deleted file mode 100644
index 3b2884b..0000000
--- a/orc/src/java/org/apache/orc/impl/DynamicIntArray.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-/**
- * Dynamic int array that uses primitive types and chunks to avoid copying
- * large number of integers when it resizes.
- *
- * The motivation for this class is memory optimization, i.e. space efficient
- * storage of potentially huge arrays without good a-priori size guesses.
- *
- * The API of this class is between a primitive array and a AbstractList. It's
- * not a Collection implementation because it handles primitive types, but the
- * API could be extended to support iterators and the like.
- *
- * NOTE: Like standard Collection implementations/arrays, this class is not
- * synchronized.
- */
-public final class DynamicIntArray {
- static final int DEFAULT_CHUNKSIZE = 8 * 1024;
- static final int INIT_CHUNKS = 128;
-
- private final int chunkSize; // our allocation size
- private int[][] data; // the real data
- private int length; // max set element index +1
- private int initializedChunks = 0; // the number of created chunks
-
- public DynamicIntArray() {
- this(DEFAULT_CHUNKSIZE);
- }
-
- public DynamicIntArray(int chunkSize) {
- this.chunkSize = chunkSize;
-
- data = new int[INIT_CHUNKS][];
- }
-
- /**
- * Ensure that the given index is valid.
- */
- private void grow(int chunkIndex) {
- if (chunkIndex >= initializedChunks) {
- if (chunkIndex >= data.length) {
- int newSize = Math.max(chunkIndex + 1, 2 * data.length);
- int[][] newChunk = new int[newSize][];
- System.arraycopy(data, 0, newChunk, 0, data.length);
- data = newChunk;
- }
- for (int i=initializedChunks; i <= chunkIndex; ++i) {
- data[i] = new int[chunkSize];
- }
- initializedChunks = chunkIndex + 1;
- }
- }
-
- public int get(int index) {
- if (index >= length) {
- throw new IndexOutOfBoundsException("Index " + index +
- " is outside of 0.." +
- (length - 1));
- }
- int i = index / chunkSize;
- int j = index % chunkSize;
- return data[i][j];
- }
-
- public void set(int index, int value) {
- int i = index / chunkSize;
- int j = index % chunkSize;
- grow(i);
- if (index >= length) {
- length = index + 1;
- }
- data[i][j] = value;
- }
-
- public void increment(int index, int value) {
- int i = index / chunkSize;
- int j = index % chunkSize;
- grow(i);
- if (index >= length) {
- length = index + 1;
- }
- data[i][j] += value;
- }
-
- public void add(int value) {
- int i = length / chunkSize;
- int j = length % chunkSize;
- grow(i);
- data[i][j] = value;
- length += 1;
- }
-
- public int size() {
- return length;
- }
-
- public void clear() {
- length = 0;
- for(int i=0; i < data.length; ++i) {
- data[i] = null;
- }
- initializedChunks = 0;
- }
-
- public String toString() {
- int i;
- StringBuilder sb = new StringBuilder(length * 4);
-
- sb.append('{');
- int l = length - 1;
- for (i=0; i<l; i++) {
- sb.append(get(i));
- sb.append(',');
- }
- sb.append(get(i));
- sb.append('}');
-
- return sb.toString();
- }
-
- public int getSizeInBytes() {
- return 4 * initializedChunks * chunkSize;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/HadoopShims.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShims.java b/orc/src/java/org/apache/orc/impl/HadoopShims.java
deleted file mode 100644
index ef7d70f..0000000
--- a/orc/src/java/org/apache/orc/impl/HadoopShims.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.VersionInfo;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-public interface HadoopShims {
-
- enum DirectCompressionType {
- NONE,
- ZLIB_NOHEADER,
- ZLIB,
- SNAPPY,
- }
-
- interface DirectDecompressor {
- void decompress(ByteBuffer var1, ByteBuffer var2) throws IOException;
- }
-
- /**
- * Get a direct decompressor codec, if it is available
- * @param codec
- * @return
- */
- DirectDecompressor getDirectDecompressor(DirectCompressionType codec);
-
- /**
- * a hadoop.io ByteBufferPool shim.
- */
- public interface ByteBufferPoolShim {
- /**
- * Get a new ByteBuffer from the pool. The pool can provide this from
- * removing a buffer from its internal cache, or by allocating a
- * new buffer.
- *
- * @param direct Whether the buffer should be direct.
- * @param length The minimum length the buffer will have.
- * @return A new ByteBuffer. Its capacity can be less
- * than what was requested, but must be at
- * least 1 byte.
- */
- ByteBuffer getBuffer(boolean direct, int length);
-
- /**
- * Release a buffer back to the pool.
- * The pool may choose to put this buffer into its cache/free it.
- *
- * @param buffer a direct bytebuffer
- */
- void putBuffer(ByteBuffer buffer);
- }
-
- /**
- * Provides an HDFS ZeroCopyReader shim.
- * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to)
- * @param in ByteBufferPoolShim to allocate fallback buffers with
- *
- * @return returns null if not supported
- */
- public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException;
-
- public interface ZeroCopyReaderShim extends Closeable {
- /**
- * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer.
- * Also move the in stream by that amount. The data read can be small than maxLength.
- *
- * @return ByteBuffer read from the stream,
- */
- public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException;
- /**
- * Release a ByteBuffer obtained from a read on the
- * Also move the in stream by that amount. The data read can be small than maxLength.
- *
- */
- public void releaseBuffer(ByteBuffer buffer);
-
- /**
- * Close the underlying stream.
- * @throws IOException
- */
- public void close() throws IOException;
- }
- /**
- * Read data into a Text object in the fastest way possible
- */
- public interface TextReaderShim {
- /**
- * @param txt
- * @param size
- * @return bytes read
- * @throws IOException
- */
- void read(Text txt, int size) throws IOException;
- }
-
- /**
- * Wrap a TextReaderShim around an input stream. The reader shim will not
- * buffer any reads from the underlying stream and will only consume bytes
- * which are required for TextReaderShim.read() input.
- */
- public TextReaderShim getTextReaderShim(InputStream input) throws IOException;
-
- class Factory {
- private static HadoopShims SHIMS = null;
-
- public static synchronized HadoopShims get() {
- if (SHIMS == null) {
- String[] versionParts = VersionInfo.getVersion().split("[.]");
- int major = Integer.parseInt(versionParts[0]);
- int minor = Integer.parseInt(versionParts[1]);
- if (major < 2 || (major == 2 && minor < 3)) {
- SHIMS = new HadoopShims_2_2();
- } else {
- SHIMS = new HadoopShimsCurrent();
- }
- }
- return SHIMS;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
deleted file mode 100644
index 5c53f74..0000000
--- a/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
-import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-/**
- * Shims for recent versions of Hadoop
- */
-public class HadoopShimsCurrent implements HadoopShims {
-
- private static class DirectDecompressWrapper implements DirectDecompressor {
- private final org.apache.hadoop.io.compress.DirectDecompressor root;
-
- DirectDecompressWrapper(org.apache.hadoop.io.compress.DirectDecompressor root) {
- this.root = root;
- }
-
- public void decompress(ByteBuffer input,
- ByteBuffer output) throws IOException {
- root.decompress(input, output);
- }
- }
-
- public DirectDecompressor getDirectDecompressor(
- DirectCompressionType codec) {
- switch (codec) {
- case ZLIB:
- return new DirectDecompressWrapper
- (new ZlibDecompressor.ZlibDirectDecompressor());
- case ZLIB_NOHEADER:
- return new DirectDecompressWrapper
- (new ZlibDecompressor.ZlibDirectDecompressor
- (ZlibDecompressor.CompressionHeader.NO_HEADER, 0));
- case SNAPPY:
- return new DirectDecompressWrapper
- (new SnappyDecompressor.SnappyDirectDecompressor());
- default:
- return null;
- }
- }
-
- @Override
- public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
- ByteBufferPoolShim pool
- ) throws IOException {
- return ZeroCopyShims.getZeroCopyReader(in, pool);
- }
-
- private final class FastTextReaderShim implements TextReaderShim {
- private final DataInputStream din;
-
- public FastTextReaderShim(InputStream in) {
- this.din = new DataInputStream(in);
- }
-
- @Override
- public void read(Text txt, int len) throws IOException {
- txt.readWithKnownLength(din, len);
- }
- }
-
- @Override
- public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
- return new FastTextReaderShim(in);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java b/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
deleted file mode 100644
index 3f65e74..0000000
--- a/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.io.Text;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Method;
-
-/**
- * Shims for versions of Hadoop up to and including 2.2.x
- */
-public class HadoopShims_2_2 implements HadoopShims {
-
- final boolean zeroCopy;
- final boolean fastRead;
-
- HadoopShims_2_2() {
- boolean zcr = false;
- try {
- Class.forName("org.apache.hadoop.fs.CacheFlag", false,
- HadoopShims_2_2.class.getClassLoader());
- zcr = true;
- } catch (ClassNotFoundException ce) {
- }
- zeroCopy = zcr;
- boolean fastRead = false;
- if (zcr) {
- for (Method m : Text.class.getMethods()) {
- if ("readWithKnownLength".equals(m.getName())) {
- fastRead = true;
- }
- }
- }
- this.fastRead = fastRead;
- }
-
- public DirectDecompressor getDirectDecompressor(
- DirectCompressionType codec) {
- return null;
- }
-
- @Override
- public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in,
- ByteBufferPoolShim pool
- ) throws IOException {
- if(zeroCopy) {
- return ZeroCopyShims.getZeroCopyReader(in, pool);
- }
- /* not supported */
- return null;
- }
-
- private final class BasicTextReaderShim implements TextReaderShim {
- private final InputStream in;
-
- public BasicTextReaderShim(InputStream in) {
- this.in = in;
- }
-
- @Override
- public void read(Text txt, int len) throws IOException {
- int offset = 0;
- byte[] bytes = new byte[len];
- while (len > 0) {
- int written = in.read(bytes, offset, len);
- if (written < 0) {
- throw new EOFException("Can't finish read from " + in + " read "
- + (offset) + " bytes out of " + bytes.length);
- }
- len -= written;
- offset += written;
- }
- txt.set(bytes);
- }
- }
-
- @Override
- public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
- return new BasicTextReaderShim(in);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/InStream.java b/orc/src/java/org/apache/orc/impl/InStream.java
deleted file mode 100644
index 851f645..0000000
--- a/orc/src/java/org/apache/orc/impl/InStream.java
+++ /dev/null
@@ -1,498 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-
-import org.apache.orc.CompressionCodec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.common.io.DiskRange;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.CodedInputStream;
-
-public abstract class InStream extends InputStream {
-
- private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
- public static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
-
- protected final String name;
- protected long length;
-
- public InStream(String name, long length) {
- this.name = name;
- this.length = length;
- }
-
- public String getStreamName() {
- return name;
- }
-
- public long getStreamLength() {
- return length;
- }
-
- @Override
- public abstract void close();
-
- public static class UncompressedStream extends InStream {
- private List<DiskRange> bytes;
- private long length;
- protected long currentOffset;
- private ByteBuffer range;
- private int currentRange;
-
- public UncompressedStream(String name, List<DiskRange> input, long length) {
- super(name, length);
- reset(input, length);
- }
-
- protected void reset(List<DiskRange> input, long length) {
- this.bytes = input;
- this.length = length;
- currentRange = 0;
- currentOffset = 0;
- range = null;
- }
-
- @Override
- public int read() {
- if (range == null || range.remaining() == 0) {
- if (currentOffset == length) {
- return -1;
- }
- seek(currentOffset);
- }
- currentOffset += 1;
- return 0xff & range.get();
- }
-
- @Override
- public int read(byte[] data, int offset, int length) {
- if (range == null || range.remaining() == 0) {
- if (currentOffset == this.length) {
- return -1;
- }
- seek(currentOffset);
- }
- int actualLength = Math.min(length, range.remaining());
- range.get(data, offset, actualLength);
- currentOffset += actualLength;
- return actualLength;
- }
-
- @Override
- public int available() {
- if (range != null && range.remaining() > 0) {
- return range.remaining();
- }
- return (int) (length - currentOffset);
- }
-
- @Override
- public void close() {
- currentRange = bytes.size();
- currentOffset = length;
- // explicit de-ref of bytes[]
- bytes.clear();
- }
-
- @Override
- public void seek(PositionProvider index) throws IOException {
- seek(index.getNext());
- }
-
- public void seek(long desired) {
- if (desired == 0 && bytes.isEmpty()) {
- logEmptySeek(name);
- return;
- }
- int i = 0;
- for (DiskRange curRange : bytes) {
- if (desired == 0 && curRange.getData().remaining() == 0) {
- logEmptySeek(name);
- return;
- }
- if (curRange.getOffset() <= desired &&
- (desired - curRange.getOffset()) < curRange.getLength()) {
- currentOffset = desired;
- currentRange = i;
- this.range = curRange.getData().duplicate();
- int pos = range.position();
- pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
- this.range.position(pos);
- return;
- }
- ++i;
- }
- // if they are seeking to the precise end, go ahead and let them go there
- int segments = bytes.size();
- if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
- currentOffset = desired;
- currentRange = segments - 1;
- DiskRange curRange = bytes.get(currentRange);
- this.range = curRange.getData().duplicate();
- int pos = range.position();
- pos += (int)(desired - curRange.getOffset()); // this is why we duplicate
- this.range.position(pos);
- return;
- }
- throw new IllegalArgumentException("Seek in " + name + " to " +
- desired + " is outside of the data");
- }
-
- @Override
- public String toString() {
- return "uncompressed stream " + name + " position: " + currentOffset +
- " length: " + length + " range: " + currentRange +
- " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit());
- }
- }
-
- private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
- // TODO: use the same pool as the ORC readers
- if (isDirect) {
- return ByteBuffer.allocateDirect(size);
- } else {
- return ByteBuffer.allocate(size);
- }
- }
-
- private static class CompressedStream extends InStream {
- private final List<DiskRange> bytes;
- private final int bufferSize;
- private ByteBuffer uncompressed;
- private final CompressionCodec codec;
- private ByteBuffer compressed;
- private long currentOffset;
- private int currentRange;
- private boolean isUncompressedOriginal;
-
- public CompressedStream(String name, List<DiskRange> input, long length,
- CompressionCodec codec, int bufferSize) {
- super(name, length);
- this.bytes = input;
- this.codec = codec;
- this.bufferSize = bufferSize;
- currentOffset = 0;
- currentRange = 0;
- }
-
- private void allocateForUncompressed(int size, boolean isDirect) {
- uncompressed = allocateBuffer(size, isDirect);
- }
-
- private void readHeader() throws IOException {
- if (compressed == null || compressed.remaining() <= 0) {
- seek(currentOffset);
- }
- if (compressed.remaining() > OutStream.HEADER_SIZE) {
- int b0 = compressed.get() & 0xff;
- int b1 = compressed.get() & 0xff;
- int b2 = compressed.get() & 0xff;
- boolean isOriginal = (b0 & 0x01) == 1;
- int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
-
- if (chunkLength > bufferSize) {
- throw new IllegalArgumentException("Buffer size too small. size = " +
- bufferSize + " needed = " + chunkLength);
- }
- // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always
- assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream";
- currentOffset += OutStream.HEADER_SIZE;
-
- ByteBuffer slice = this.slice(chunkLength);
-
- if (isOriginal) {
- uncompressed = slice;
- isUncompressedOriginal = true;
- } else {
- if (isUncompressedOriginal) {
- allocateForUncompressed(bufferSize, slice.isDirect());
- isUncompressedOriginal = false;
- } else if (uncompressed == null) {
- allocateForUncompressed(bufferSize, slice.isDirect());
- } else {
- uncompressed.clear();
- }
- codec.decompress(slice, uncompressed);
- }
- } else {
- throw new IllegalStateException("Can't read header at " + this);
- }
- }
-
- @Override
- public int read() throws IOException {
- if (uncompressed == null || uncompressed.remaining() == 0) {
- if (currentOffset == length) {
- return -1;
- }
- readHeader();
- }
- return 0xff & uncompressed.get();
- }
-
- @Override
- public int read(byte[] data, int offset, int length) throws IOException {
- if (uncompressed == null || uncompressed.remaining() == 0) {
- if (currentOffset == this.length) {
- return -1;
- }
- readHeader();
- }
- int actualLength = Math.min(length, uncompressed.remaining());
- uncompressed.get(data, offset, actualLength);
- return actualLength;
- }
-
- @Override
- public int available() throws IOException {
- if (uncompressed == null || uncompressed.remaining() == 0) {
- if (currentOffset == length) {
- return 0;
- }
- readHeader();
- }
- return uncompressed.remaining();
- }
-
- @Override
- public void close() {
- uncompressed = null;
- compressed = null;
- currentRange = bytes.size();
- currentOffset = length;
- bytes.clear();
- }
-
- @Override
- public void seek(PositionProvider index) throws IOException {
- seek(index.getNext());
- long uncompressedBytes = index.getNext();
- if (uncompressedBytes != 0) {
- readHeader();
- uncompressed.position(uncompressed.position() +
- (int) uncompressedBytes);
- } else if (uncompressed != null) {
- // mark the uncompressed buffer as done
- uncompressed.position(uncompressed.limit());
- }
- }
-
- /* slices a read only contiguous buffer of chunkLength */
- private ByteBuffer slice(int chunkLength) throws IOException {
- int len = chunkLength;
- final long oldOffset = currentOffset;
- ByteBuffer slice;
- if (compressed.remaining() >= len) {
- slice = compressed.slice();
- // simple case
- slice.limit(len);
- currentOffset += len;
- compressed.position(compressed.position() + len);
- return slice;
- } else if (currentRange >= (bytes.size() - 1)) {
- // nothing has been modified yet
- throw new IOException("EOF in " + this + " while trying to read " +
- chunkLength + " bytes");
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format(
- "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)",
- compressed.remaining(), len));
- }
-
- // we need to consolidate 2 or more buffers into 1
- // first copy out compressed buffers
- ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
- currentOffset += compressed.remaining();
- len -= compressed.remaining();
- copy.put(compressed);
- ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
-
- while (len > 0 && iter.hasNext()) {
- ++currentRange;
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
- }
- DiskRange range = iter.next();
- compressed = range.getData().duplicate();
- if (compressed.remaining() >= len) {
- slice = compressed.slice();
- slice.limit(len);
- copy.put(slice);
- currentOffset += len;
- compressed.position(compressed.position() + len);
- return copy;
- }
- currentOffset += compressed.remaining();
- len -= compressed.remaining();
- copy.put(compressed);
- }
-
- // restore offsets for exception clarity
- seek(oldOffset);
- throw new IOException("EOF in " + this + " while trying to read " +
- chunkLength + " bytes");
- }
-
- private void seek(long desired) throws IOException {
- if (desired == 0 && bytes.isEmpty()) {
- logEmptySeek(name);
- return;
- }
- int i = 0;
- for (DiskRange range : bytes) {
- if (range.getOffset() <= desired && desired < range.getEnd()) {
- currentRange = i;
- compressed = range.getData().duplicate();
- int pos = compressed.position();
- pos += (int)(desired - range.getOffset());
- compressed.position(pos);
- currentOffset = desired;
- return;
- }
- ++i;
- }
- // if they are seeking to the precise end, go ahead and let them go there
- int segments = bytes.size();
- if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) {
- DiskRange range = bytes.get(segments - 1);
- currentRange = segments - 1;
- compressed = range.getData().duplicate();
- compressed.position(compressed.limit());
- currentOffset = desired;
- return;
- }
- throw new IOException("Seek outside of data in " + this + " to " + desired);
- }
-
- private String rangeString() {
- StringBuilder builder = new StringBuilder();
- int i = 0;
- for (DiskRange range : bytes) {
- if (i != 0) {
- builder.append("; ");
- }
- builder.append(" range " + i + " = " + range.getOffset()
- + " to " + (range.getEnd() - range.getOffset()));
- ++i;
- }
- return builder.toString();
- }
-
- @Override
- public String toString() {
- return "compressed stream " + name + " position: " + currentOffset +
- " length: " + length + " range: " + currentRange +
- " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) +
- rangeString() +
- (uncompressed == null ? "" :
- " uncompressed: " + uncompressed.position() + " to " +
- uncompressed.limit());
- }
- }
-
- public abstract void seek(PositionProvider index) throws IOException;
-
- private static void logEmptySeek(String name) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream.");
- }
- }
-
- /**
- * Create an input stream from a list of buffers.
- * @param streamName the name of the stream
- * @param buffers the list of ranges of bytes for the stream
- * @param offsets a list of offsets (the same length as input) that must
- * contain the first offset of the each set of bytes in input
- * @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
- * @return an input stream
- * @throws IOException
- */
- @VisibleForTesting
- @Deprecated
- public static InStream create(String streamName,
- ByteBuffer[] buffers,
- long[] offsets,
- long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
- for (int i = 0; i < buffers.length; ++i) {
- input.add(new BufferChunk(buffers[i], offsets[i]));
- }
- return create(streamName, input, length, codec, bufferSize);
- }
-
- /**
- * Create an input stream from a list of disk ranges with data.
- * @param name the name of the stream
- * @param input the list of ranges of bytes for the stream; from disk or cache
- * @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
- * @return an input stream
- * @throws IOException
- */
- public static InStream create(String name,
- List<DiskRange> input,
- long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- if (codec == null) {
- return new UncompressedStream(name, input, length);
- } else {
- return new CompressedStream(name, input, length, codec, bufferSize);
- }
- }
-
- /**
- * Creates coded input stream (used for protobuf message parsing) with higher message size limit.
- *
- * @param name the name of the stream
- * @param input the list of ranges of bytes for the stream; from disk or cache
- * @param length the length in bytes of the stream
- * @param codec the compression codec
- * @param bufferSize the compression buffer size
- * @return coded input stream
- * @throws IOException
- */
- public static CodedInputStream createCodedInputStream(
- String name,
- List<DiskRange> input,
- long length,
- CompressionCodec codec,
- int bufferSize) throws IOException {
- InStream inStream = create(name, input, length, codec, bufferSize);
- CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream);
- codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT);
- return codedInputStream;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/IntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java
deleted file mode 100644
index 3e64d54..0000000
--- a/orc/src/java/org/apache/orc/impl/IntegerReader.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-
-/**
- * Interface for reading integers.
- */
-public interface IntegerReader {
-
- /**
- * Seek to the position provided by index.
- * @param index
- * @throws IOException
- */
- void seek(PositionProvider index) throws IOException;
-
- /**
- * Skip number of specified rows.
- * @param numValues
- * @throws IOException
- */
- void skip(long numValues) throws IOException;
-
- /**
- * Check if there are any more values left.
- * @return
- * @throws IOException
- */
- boolean hasNext() throws IOException;
-
- /**
- * Return the next available value.
- * @return
- * @throws IOException
- */
- long next() throws IOException;
-
- /**
- * Return the next available vector for values.
- * @param column the column being read
- * @param data the vector to read into
- * @param length the number of numbers to read
- * @throws IOException
- */
- void nextVector(ColumnVector column,
- long[] data,
- int length
- ) throws IOException;
-
- /**
- * Return the next available vector for values. Does not change the
- * value of column.isRepeating.
- * @param column the column being read
- * @param data the vector to read into
- * @param length the number of numbers to read
- * @throws IOException
- */
- void nextVector(ColumnVector column,
- int[] data,
- int length
- ) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/IntegerWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/IntegerWriter.java b/orc/src/java/org/apache/orc/impl/IntegerWriter.java
deleted file mode 100644
index 419054f..0000000
--- a/orc/src/java/org/apache/orc/impl/IntegerWriter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import java.io.IOException;
-
-/**
- * Interface for writing integers.
- */
-public interface IntegerWriter {
-
- /**
- * Get position from the stream.
- * @param recorder
- * @throws IOException
- */
- void getPosition(PositionRecorder recorder) throws IOException;
-
- /**
- * Write the integer value
- * @param value
- * @throws IOException
- */
- void write(long value) throws IOException;
-
- /**
- * Flush the buffer
- * @throws IOException
- */
- void flush() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/MemoryManager.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/MemoryManager.java b/orc/src/java/org/apache/orc/impl/MemoryManager.java
deleted file mode 100644
index 757c0b4..0000000
--- a/orc/src/java/org/apache/orc/impl/MemoryManager.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.orc.OrcConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Implements a memory manager that keeps a global context of how many ORC
- * writers there are and manages the memory between them. For use cases with
- * dynamic partitions, it is easy to end up with many writers in the same task.
- * By managing the size of each allocation, we try to cut down the size of each
- * allocation and keep the task from running out of memory.
- *
- * This class is not thread safe, but is re-entrant - ensure creation and all
- * invocations are triggered from the same thread.
- */
-public class MemoryManager {
-
- private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class);
-
- /**
- * How often should we check the memory sizes? Measured in rows added
- * to all of the writers.
- */
- private static final int ROWS_BETWEEN_CHECKS = 5000;
- private final long totalMemoryPool;
- private final Map<Path, WriterInfo> writerList =
- new HashMap<Path, WriterInfo>();
- private long totalAllocation = 0;
- private double currentScale = 1;
- private int rowsAddedSinceCheck = 0;
- private final OwnedLock ownerLock = new OwnedLock();
-
- @SuppressWarnings("serial")
- private static class OwnedLock extends ReentrantLock {
- public Thread getOwner() {
- return super.getOwner();
- }
- }
-
- private static class WriterInfo {
- long allocation;
- Callback callback;
- WriterInfo(long allocation, Callback callback) {
- this.allocation = allocation;
- this.callback = callback;
- }
- }
-
- public interface Callback {
- /**
- * The writer needs to check its memory usage
- * @param newScale the current scale factor for memory allocations
- * @return true if the writer was over the limit
- * @throws IOException
- */
- boolean checkMemory(double newScale) throws IOException;
- }
-
- /**
- * Create the memory manager.
- * @param conf use the configuration to find the maximum size of the memory
- * pool.
- */
- public MemoryManager(Configuration conf) {
- double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf);
- totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean().
- getHeapMemoryUsage().getMax() * maxLoad);
- ownerLock.lock();
- }
-
- /**
- * Light weight thread-safety check for multi-threaded access patterns
- */
- private void checkOwner() {
- if (!ownerLock.isHeldByCurrentThread()) {
- LOG.warn("Owner thread expected {}, got {}",
- ownerLock.getOwner(), Thread.currentThread());
- }
- }
-
- /**
- * Add a new writer's memory allocation to the pool. We use the path
- * as a unique key to ensure that we don't get duplicates.
- * @param path the file that is being written
- * @param requestedAllocation the requested buffer size
- */
- public void addWriter(Path path, long requestedAllocation,
- Callback callback) throws IOException {
- checkOwner();
- WriterInfo oldVal = writerList.get(path);
- // this should always be null, but we handle the case where the memory
- // manager wasn't told that a writer wasn't still in use and the task
- // starts writing to the same path.
- if (oldVal == null) {
- oldVal = new WriterInfo(requestedAllocation, callback);
- writerList.put(path, oldVal);
- totalAllocation += requestedAllocation;
- } else {
- // handle a new writer that is writing to the same path
- totalAllocation += requestedAllocation - oldVal.allocation;
- oldVal.allocation = requestedAllocation;
- oldVal.callback = callback;
- }
- updateScale(true);
- }
-
- /**
- * Remove the given writer from the pool.
- * @param path the file that has been closed
- */
- public void removeWriter(Path path) throws IOException {
- checkOwner();
- WriterInfo val = writerList.get(path);
- if (val != null) {
- writerList.remove(path);
- totalAllocation -= val.allocation;
- if (writerList.isEmpty()) {
- rowsAddedSinceCheck = 0;
- }
- updateScale(false);
- }
- if(writerList.isEmpty()) {
- rowsAddedSinceCheck = 0;
- }
- }
-
- /**
- * Get the total pool size that is available for ORC writers.
- * @return the number of bytes in the pool
- */
- public long getTotalMemoryPool() {
- return totalMemoryPool;
- }
-
- /**
- * The scaling factor for each allocation to ensure that the pool isn't
- * oversubscribed.
- * @return a fraction between 0.0 and 1.0 of the requested size that is
- * available for each writer.
- */
- public double getAllocationScale() {
- return currentScale;
- }
-
- /**
- * Give the memory manager an opportunity for doing a memory check.
- * @param rows number of rows added
- * @throws IOException
- */
- public void addedRow(int rows) throws IOException {
- rowsAddedSinceCheck += rows;
- if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
- notifyWriters();
- }
- }
-
- /**
- * Notify all of the writers that they should check their memory usage.
- * @throws IOException
- */
- public void notifyWriters() throws IOException {
- checkOwner();
- LOG.debug("Notifying writers after " + rowsAddedSinceCheck);
- for(WriterInfo writer: writerList.values()) {
- boolean flushed = writer.callback.checkMemory(currentScale);
- if (LOG.isDebugEnabled() && flushed) {
- LOG.debug("flushed " + writer.toString());
- }
- }
- rowsAddedSinceCheck = 0;
- }
-
- /**
- * Update the currentScale based on the current allocation and pool size.
- * This also updates the notificationTrigger.
- * @param isAllocate is this an allocation?
- */
- private void updateScale(boolean isAllocate) throws IOException {
- if (totalAllocation <= totalMemoryPool) {
- currentScale = 1;
- } else {
- currentScale = (double) totalMemoryPool / totalAllocation;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java b/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
deleted file mode 100644
index 7ca9e1d..0000000
--- a/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.orc.Reader;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-
-public class OrcAcidUtils {
- public static final String ACID_STATS = "hive.acid.stats";
- public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
-
- /**
- * Get the filename of the ORC ACID side file that contains the lengths
- * of the intermediate footers.
- * @param main the main ORC filename
- * @return the name of the side file
- */
- public static Path getSideFile(Path main) {
- return new Path(main + DELTA_SIDE_FILE_SUFFIX);
- }
-
- /**
- * Read the side file to get the last flush length.
- * @param fs the file system to use
- * @param deltaFile the path of the delta file
- * @return the maximum size of the file to use
- * @throws IOException
- */
- public static long getLastFlushLength(FileSystem fs,
- Path deltaFile) throws IOException {
- Path lengths = getSideFile(deltaFile);
- long result = Long.MAX_VALUE;
- if(!fs.exists(lengths)) {
- return result;
- }
- try (FSDataInputStream stream = fs.open(lengths)) {
- result = -1;
- while (stream.available() > 0) {
- result = stream.readLong();
- }
- return result;
- } catch (IOException ioe) {
- return result;
- }
- }
-
- private static final Charset utf8 = Charset.forName("UTF-8");
- private static final CharsetDecoder utf8Decoder = utf8.newDecoder();
-
- public static AcidStats parseAcidStats(Reader reader) {
- if (reader.hasMetadataValue(ACID_STATS)) {
- try {
- ByteBuffer val = reader.getMetadataValue(ACID_STATS).duplicate();
- return new AcidStats(utf8Decoder.decode(val).toString());
- } catch (CharacterCodingException e) {
- throw new IllegalArgumentException("Bad string encoding for " +
- ACID_STATS, e);
- }
- } else {
- return null;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OrcIndex.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcIndex.java b/orc/src/java/org/apache/orc/impl/OrcIndex.java
deleted file mode 100644
index 50a15f2..0000000
--- a/orc/src/java/org/apache/orc/impl/OrcIndex.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import org.apache.orc.OrcProto;
-
-public final class OrcIndex {
- OrcProto.RowIndex[] rowGroupIndex;
- OrcProto.BloomFilterIndex[] bloomFilterIndex;
-
- public OrcIndex(OrcProto.RowIndex[] rgIndex, OrcProto.BloomFilterIndex[] bfIndex) {
- this.rowGroupIndex = rgIndex;
- this.bloomFilterIndex = bfIndex;
- }
-
- public OrcProto.RowIndex[] getRowGroupIndex() {
- return rowGroupIndex;
- }
-
- public OrcProto.BloomFilterIndex[] getBloomFilterIndex() {
- return bloomFilterIndex;
- }
-
- public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) {
- this.rowGroupIndex = rowGroupIndex;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OrcTail.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcTail.java b/orc/src/java/org/apache/orc/impl/OrcTail.java
deleted file mode 100644
index f095603..0000000
--- a/orc/src/java/org/apache/orc/impl/OrcTail.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.orc.impl;
-
-import static org.apache.orc.impl.ReaderImpl.extractMetadata;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcFile;
-import org.apache.orc.OrcProto;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.StripeStatistics;
-
-// TODO: Make OrcTail implement FileMetadata or Reader interface
-public final class OrcTail {
- // postscript + footer - Serialized in OrcSplit
- private final OrcProto.FileTail fileTail;
- // serialized representation of metadata, footer and postscript
- private final ByteBuffer serializedTail;
- // used to invalidate cache entries
- private final long fileModificationTime;
- // lazily deserialized
- private OrcProto.Metadata metadata;
-
- public OrcTail(OrcProto.FileTail fileTail, ByteBuffer serializedTail) {
- this(fileTail, serializedTail, -1);
- }
-
- public OrcTail(OrcProto.FileTail fileTail, ByteBuffer serializedTail, long fileModificationTime) {
- this.fileTail = fileTail;
- this.serializedTail = serializedTail;
- this.fileModificationTime = fileModificationTime;
- this.metadata = null;
- }
-
- public ByteBuffer getSerializedTail() {
- return serializedTail;
- }
-
- public long getFileModificationTime() {
- return fileModificationTime;
- }
-
- public OrcProto.Footer getFooter() {
- return fileTail.getFooter();
- }
-
- public OrcProto.PostScript getPostScript() {
- return fileTail.getPostscript();
- }
-
- public OrcFile.WriterVersion getWriterVersion() {
- OrcProto.PostScript ps = fileTail.getPostscript();
- return (ps.hasWriterVersion()
- ? OrcFile.WriterVersion.from(ps.getWriterVersion()) : OrcFile.WriterVersion.ORIGINAL);
- }
-
- public List<StripeInformation> getStripes() {
- List<StripeInformation> result = new ArrayList<>(fileTail.getFooter().getStripesCount());
- for (OrcProto.StripeInformation stripeProto : fileTail.getFooter().getStripesList()) {
- result.add(new ReaderImpl.StripeInformationImpl(stripeProto));
- }
- return result;
- }
-
- public CompressionKind getCompressionKind() {
- return CompressionKind.valueOf(fileTail.getPostscript().getCompression().name());
- }
-
- public CompressionCodec getCompressionCodec() {
- return PhysicalFsWriter.createCodec(getCompressionKind());
- }
-
- public int getCompressionBufferSize() {
- return (int) fileTail.getPostscript().getCompressionBlockSize();
- }
-
- public List<StripeStatistics> getStripeStatistics() throws IOException {
- List<StripeStatistics> result = new ArrayList<>();
- List<OrcProto.StripeStatistics> ssProto = getStripeStatisticsProto();
- if (ssProto != null) {
- for (OrcProto.StripeStatistics ss : ssProto) {
- result.add(new StripeStatistics(ss.getColStatsList()));
- }
- }
- return result;
- }
-
- public List<OrcProto.StripeStatistics> getStripeStatisticsProto() throws IOException {
- if (serializedTail == null) return null;
- if (metadata == null) {
- metadata = extractMetadata(serializedTail, 0,
- (int) fileTail.getPostscript().getMetadataLength(),
- getCompressionCodec(), getCompressionBufferSize());
- // clear does not clear the contents but sets position to 0 and limit = capacity
- serializedTail.clear();
- }
- return metadata.getStripeStatsList();
- }
-
- public int getMetadataSize() {
- return (int) getPostScript().getMetadataLength();
- }
-
- public List<OrcProto.Type> getTypes() {
- return getFooter().getTypesList();
- }
-
- public OrcProto.FileTail getFileTail() {
- return fileTail;
- }
-
- public OrcProto.FileTail getMinimalFileTail() {
- OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder(fileTail);
- OrcProto.Footer.Builder footerBuilder = OrcProto.Footer.newBuilder(fileTail.getFooter());
- footerBuilder.clearStatistics();
- fileTailBuilder.setFooter(footerBuilder.build());
- OrcProto.FileTail result = fileTailBuilder.build();
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OutStream.java b/orc/src/java/org/apache/orc/impl/OutStream.java
deleted file mode 100644
index 81662cc..0000000
--- a/orc/src/java/org/apache/orc/impl/OutStream.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.orc.impl;
-
-import org.apache.orc.CompressionCodec;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class OutStream extends PositionedOutputStream {
-
- public interface OutputReceiver {
- /**
- * Output the given buffer to the final destination
- * @param buffer the buffer to output
- * @throws IOException
- */
- void output(ByteBuffer buffer) throws IOException;
- }
-
- public static final int HEADER_SIZE = 3;
- private final String name;
- private final OutputReceiver receiver;
- // if enabled the stream will be suppressed when writing stripe
- private boolean suppress;
-
- /**
- * Stores the uncompressed bytes that have been serialized, but not
- * compressed yet. When this fills, we compress the entire buffer.
- */
- private ByteBuffer current = null;
-
- /**
- * Stores the compressed bytes until we have a full buffer and then outputs
- * them to the receiver. If no compression is being done, this (and overflow)
- * will always be null and the current buffer will be sent directly to the
- * receiver.
- */
- private ByteBuffer compressed = null;
-
- /**
- * Since the compressed buffer may start with contents from previous
- * compression blocks, we allocate an overflow buffer so that the
- * output of the codec can be split between the two buffers. After the
- * compressed buffer is sent to the receiver, the overflow buffer becomes
- * the new compressed buffer.
- */
- private ByteBuffer overflow = null;
- private final int bufferSize;
- private final CompressionCodec codec;
- private long compressedBytes = 0;
- private long uncompressedBytes = 0;
-
- public OutStream(String name,
- int bufferSize,
- CompressionCodec codec,
- OutputReceiver receiver) throws IOException {
- this.name = name;
- this.bufferSize = bufferSize;
- this.codec = codec;
- this.receiver = receiver;
- this.suppress = false;
- }
-
- public void clear() throws IOException {
- flush();
- suppress = false;
- }
-
- /**
- * Write the length of the compressed bytes. Life is much easier if the
- * header is constant length, so just use 3 bytes. Considering most of the
- * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should
- * be plenty. We also use the low bit for whether it is the original or
- * compressed bytes.
- * @param buffer the buffer to write the header to
- * @param position the position in the buffer to write at
- * @param val the size in the file
- * @param original is it uncompressed
- */
- private static void writeHeader(ByteBuffer buffer,
- int position,
- int val,
- boolean original) {
- buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
- buffer.put(position + 1, (byte) (val >> 7));
- buffer.put(position + 2, (byte) (val >> 15));
- }
-
- private void getNewInputBuffer() throws IOException {
- if (codec == null) {
- current = ByteBuffer.allocate(bufferSize);
- } else {
- current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
- writeHeader(current, 0, bufferSize, true);
- current.position(HEADER_SIZE);
- }
- }
-
- /**
- * Allocate a new output buffer if we are compressing.
- */
- private ByteBuffer getNewOutputBuffer() throws IOException {
- return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
- }
-
- private void flip() throws IOException {
- current.limit(current.position());
- current.position(codec == null ? 0 : HEADER_SIZE);
- }
-
- @Override
- public void write(int i) throws IOException {
- if (current == null) {
- getNewInputBuffer();
- }
- if (current.remaining() < 1) {
- spill();
- }
- uncompressedBytes += 1;
- current.put((byte) i);
- }
-
- @Override
- public void write(byte[] bytes, int offset, int length) throws IOException {
- if (current == null) {
- getNewInputBuffer();
- }
- int remaining = Math.min(current.remaining(), length);
- current.put(bytes, offset, remaining);
- uncompressedBytes += remaining;
- length -= remaining;
- while (length != 0) {
- spill();
- offset += remaining;
- remaining = Math.min(current.remaining(), length);
- current.put(bytes, offset, remaining);
- uncompressedBytes += remaining;
- length -= remaining;
- }
- }
-
- private void spill() throws java.io.IOException {
- // if there isn't anything in the current buffer, don't spill
- if (current == null ||
- current.position() == (codec == null ? 0 : HEADER_SIZE)) {
- return;
- }
- flip();
- if (codec == null) {
- receiver.output(current);
- getNewInputBuffer();
- } else {
- if (compressed == null) {
- compressed = getNewOutputBuffer();
- } else if (overflow == null) {
- overflow = getNewOutputBuffer();
- }
- int sizePosn = compressed.position();
- compressed.position(compressed.position() + HEADER_SIZE);
- if (codec.compress(current, compressed, overflow)) {
- uncompressedBytes = 0;
- // move position back to after the header
- current.position(HEADER_SIZE);
- current.limit(current.capacity());
- // find the total bytes in the chunk
- int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
- if (overflow != null) {
- totalBytes += overflow.position();
- }
- compressedBytes += totalBytes + HEADER_SIZE;
- writeHeader(compressed, sizePosn, totalBytes, false);
- // if we have less than the next header left, spill it.
- if (compressed.remaining() < HEADER_SIZE) {
- compressed.flip();
- receiver.output(compressed);
- compressed = overflow;
- overflow = null;
- }
- } else {
- compressedBytes += uncompressedBytes + HEADER_SIZE;
- uncompressedBytes = 0;
- // we are using the original, but need to spill the current
- // compressed buffer first. So back up to where we started,
- // flip it and add it to done.
- if (sizePosn != 0) {
- compressed.position(sizePosn);
- compressed.flip();
- receiver.output(compressed);
- compressed = null;
- // if we have an overflow, clear it and make it the new compress
- // buffer
- if (overflow != null) {
- overflow.clear();
- compressed = overflow;
- overflow = null;
- }
- } else {
- compressed.clear();
- if (overflow != null) {
- overflow.clear();
- }
- }
-
- // now add the current buffer into the done list and get a new one.
- current.position(0);
- // update the header with the current length
- writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
- receiver.output(current);
- getNewInputBuffer();
- }
- }
- }
-
- @Override
- public void getPosition(PositionRecorder recorder) throws IOException {
- if (codec == null) {
- recorder.addPosition(uncompressedBytes);
- } else {
- recorder.addPosition(compressedBytes);
- recorder.addPosition(uncompressedBytes);
- }
- }
-
- @Override
- public void flush() throws IOException {
- spill();
- if (compressed != null && compressed.position() != 0) {
- compressed.flip();
- receiver.output(compressed);
- }
- compressed = null;
- uncompressedBytes = 0;
- compressedBytes = 0;
- overflow = null;
- current = null;
- }
-
- @Override
- public String toString() {
- return name;
- }
-
- @Override
- public long getBufferSize() {
- long result = 0;
- if (current != null) {
- result += current.capacity();
- }
- if (compressed != null) {
- result += compressed.capacity();
- }
- if (overflow != null) {
- result += overflow.capacity();
- }
- return result;
- }
-
- /**
- * Set suppress flag
- */
- public void suppress() {
- suppress = true;
- }
-
- /**
- * Returns the state of suppress flag
- * @return value of suppress flag
- */
- public boolean isSuppressed() {
- return suppress;
- }
-}
-