You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2017/07/19 16:58:53 UTC
[30/37] hive git commit: HIVE-17118. Move the hive-orc source files
to make the package names unique.
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/PhysicalFsWriter.java b/orc/src/java/org/apache/hive/orc/impl/PhysicalFsWriter.java
new file mode 100644
index 0000000..47c33bb
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/PhysicalFsWriter.java
@@ -0,0 +1,529 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hive.orc.CompressionCodec;
+import org.apache.hive.orc.CompressionCodec.Modifier;
+import org.apache.hive.orc.CompressionKind;
+import org.apache.hive.orc.OrcFile;
+import org.apache.hive.orc.OrcFile.CompressionStrategy;
+import org.apache.hive.orc.OrcProto;
+import org.apache.hive.orc.OrcProto.BloomFilterIndex;
+import org.apache.hive.orc.OrcProto.Footer;
+import org.apache.hive.orc.OrcProto.Metadata;
+import org.apache.hive.orc.OrcProto.PostScript;
+import org.apache.hive.orc.OrcProto.Stream.Kind;
+import org.apache.hive.orc.OrcProto.StripeFooter;
+import org.apache.hive.orc.OrcProto.StripeInformation;
+import org.apache.hive.orc.OrcProto.RowIndex.Builder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.CodedOutputStream;
+
+public class PhysicalFsWriter implements PhysicalWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class);
+
+ private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+
+ private FSDataOutputStream rawWriter = null;
+ // the compressed metadata information outStream
+ private OutStream writer = null;
+ // a protobuf outStream around streamFactory
+ private CodedOutputStream protobufWriter = null;
+
+ private final FileSystem fs;
+ private final Path path;
+ private final long blockSize;
+ private final int bufferSize;
+ private final CompressionCodec codec;
+ private final double paddingTolerance;
+ private final long defaultStripeSize;
+ private final CompressionKind compress;
+ private final boolean addBlockPadding;
+ private final CompressionStrategy compressionStrategy;
+
+ // the streams that make up the current stripe
+ private final Map<StreamName, BufferedStream> streams =
+ new TreeMap<StreamName, BufferedStream>();
+
+ private long adjustedStripeSize;
+ private long headerLength;
+ private long stripeStart;
+ private int metadataLength;
+ private int footerLength;
+
+ public PhysicalFsWriter(FileSystem fs, Path path, int numColumns, OrcFile.WriterOptions opts) {
+ this.fs = fs;
+ this.path = path;
+ this.defaultStripeSize = this.adjustedStripeSize = opts.getStripeSize();
+ this.addBlockPadding = opts.getBlockPadding();
+ if (opts.isEnforceBufferSize()) {
+ this.bufferSize = opts.getBufferSize();
+ } else {
+ this.bufferSize = getEstimatedBufferSize(defaultStripeSize, numColumns, opts.getBufferSize());
+ }
+ this.compress = opts.getCompress();
+ this.compressionStrategy = opts.getCompressionStrategy();
+ codec = createCodec(compress);
+ this.paddingTolerance = opts.getPaddingTolerance();
+ this.blockSize = opts.getBlockSize();
+ LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
+ " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
+ compress, bufferSize);
+ }
+
+ @Override
+ public void initialize() throws IOException {
+ if (rawWriter != null) return;
+ rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
+ fs.getDefaultReplication(path), blockSize);
+ rawWriter.writeBytes(OrcFile.MAGIC);
+ headerLength = rawWriter.getPos();
+ writer = new OutStream("metadata", bufferSize, codec,
+ new DirectStream(rawWriter));
+ protobufWriter = CodedOutputStream.newInstance(writer);
+ }
+
+ private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException {
+ this.stripeStart = rawWriter.getPos();
+ final long currentStripeSize = indexSize + dataSize + footerSize;
+ final long available = blockSize - (stripeStart % blockSize);
+ final long overflow = currentStripeSize - adjustedStripeSize;
+ final float availRatio = (float) available / (float) defaultStripeSize;
+
+ if (availRatio > 0.0f && availRatio < 1.0f
+ && availRatio > paddingTolerance) {
+ // adjust default stripe size to fit into remaining space, also adjust
+ // the next stripe for correction based on the current stripe size
+ // and user specified padding tolerance. Since stripe size can overflow
+ // the default stripe size we should apply this correction to avoid
+ // writing portion of last stripe to next hdfs block.
+ double correction = overflow > 0 ? (double) overflow
+ / (double) adjustedStripeSize : 0.0;
+
+ // correction should not be greater than user specified padding
+ // tolerance
+ correction = correction > paddingTolerance ? paddingTolerance
+ : correction;
+
+ // adjust next stripe size based on current stripe estimate correction
+ adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
+ } else if (availRatio >= 1.0) {
+ adjustedStripeSize = defaultStripeSize;
+ }
+
+ if (availRatio < paddingTolerance && addBlockPadding) {
+ long padding = blockSize - (stripeStart % blockSize);
+ byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
+ LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)",
+ padding, availRatio, defaultStripeSize));
+ stripeStart += padding;
+ while (padding > 0) {
+ int writeLen = (int) Math.min(padding, pad.length);
+ rawWriter.write(pad, 0, writeLen);
+ padding -= writeLen;
+ }
+ adjustedStripeSize = defaultStripeSize;
+ } else if (currentStripeSize < blockSize
+ && (stripeStart % blockSize) + currentStripeSize > blockSize) {
+ // even if you don't pad, reset the default stripe size when crossing a
+ // block boundary
+ adjustedStripeSize = defaultStripeSize;
+ }
+ }
+
+ /**
+ * An output receiver that writes the ByteBuffers to the output stream
+ * as they are received.
+ */
+ private class DirectStream implements OutStream.OutputReceiver {
+ private final FSDataOutputStream output;
+
+ DirectStream(FSDataOutputStream output) {
+ this.output = output;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ output.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ }
+ }
+
+ @Override
+ public long getPhysicalStripeSize() {
+ return adjustedStripeSize;
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return codec != null;
+ }
+
+
+ public static CompressionCodec createCodec(CompressionKind kind) {
+ switch (kind) {
+ case NONE:
+ return null;
+ case ZLIB:
+ return new ZlibCodec();
+ case SNAPPY:
+ return new SnappyCodec();
+ case LZO:
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ if (loader == null) {
+ loader = WriterImpl.class.getClassLoader();
+ }
+ @SuppressWarnings("unchecked")
+ Class<? extends CompressionCodec> lzo =
+ (Class<? extends CompressionCodec>)
+ loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+ return lzo.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("LZO is not available.", e);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Problem initializing LZO", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Insufficient access to LZO", e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression codec: " +
+ kind);
+ }
+ }
+
+ private void writeStripeFooter(StripeFooter footer, long dataSize, long indexSize,
+ StripeInformation.Builder dirEntry) throws IOException {
+ footer.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ dirEntry.setOffset(stripeStart);
+ dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - dataSize - indexSize);
+ }
+
+ @VisibleForTesting
+ public static int getEstimatedBufferSize(long stripeSize, int numColumns,
+ int bs) {
+ // The worst case is that there are 2 big streams per a column and
+ // we want to guarantee that each stream gets ~10 buffers.
+ // This keeps buffers small enough that we don't get really small stripe
+ // sizes.
+ int estBufferSize = (int) (stripeSize / (20 * numColumns));
+ estBufferSize = getClosestBufferSize(estBufferSize);
+ return estBufferSize > bs ? bs : estBufferSize;
+ }
+
+ private static int getClosestBufferSize(int estBufferSize) {
+ final int kb4 = 4 * 1024;
+ final int kb8 = 8 * 1024;
+ final int kb16 = 16 * 1024;
+ final int kb32 = 32 * 1024;
+ final int kb64 = 64 * 1024;
+ final int kb128 = 128 * 1024;
+ final int kb256 = 256 * 1024;
+ if (estBufferSize <= kb4) {
+ return kb4;
+ } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
+ return kb8;
+ } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
+ return kb16;
+ } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
+ return kb32;
+ } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
+ return kb64;
+ } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
+ return kb128;
+ } else {
+ return kb256;
+ }
+ }
+
+ @Override
+ public void writeFileMetadata(Metadata.Builder builder) throws IOException {
+ long startPosn = rawWriter.getPos();
+ Metadata metadata = builder.build();
+ metadata.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ this.metadataLength = (int) (rawWriter.getPos() - startPosn);
+ }
+
+ @Override
+ public void writeFileFooter(Footer.Builder builder) throws IOException {
+ long bodyLength = rawWriter.getPos() - metadataLength;
+ builder.setContentLength(bodyLength);
+ builder.setHeaderLength(headerLength);
+ long startPosn = rawWriter.getPos();
+ Footer footer = builder.build();
+ footer.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ this.footerLength = (int) (rawWriter.getPos() - startPosn);
+ }
+
+ @Override
+ public void writePostScript(PostScript.Builder builder) throws IOException {
+ builder.setCompression(writeCompressionKind(compress));
+ builder.setFooterLength(footerLength);
+ builder.setMetadataLength(metadataLength);
+ if (compress != CompressionKind.NONE) {
+ builder.setCompressionBlockSize(bufferSize);
+ }
+ PostScript ps = builder.build();
+ // need to write this uncompressed
+ long startPosn = rawWriter.getPos();
+ ps.writeTo(rawWriter);
+ long length = rawWriter.getPos() - startPosn;
+ if (length > 255) {
+ throw new IllegalArgumentException("PostScript too large at " + length);
+ }
+ rawWriter.writeByte((int)length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ rawWriter.close();
+ }
+
+ private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
+ switch (kind) {
+ case NONE: return OrcProto.CompressionKind.NONE;
+ case ZLIB: return OrcProto.CompressionKind.ZLIB;
+ case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
+ case LZO: return OrcProto.CompressionKind.LZO;
+ default:
+ throw new IllegalArgumentException("Unknown compression " + kind);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ rawWriter.hflush();
+ // TODO: reset?
+ }
+
+ @Override
+ public long getRawWriterPosition() throws IOException {
+ return rawWriter.getPos();
+ }
+
+ @Override
+ public void appendRawStripe(byte[] stripe, int offset, int length,
+ StripeInformation.Builder dirEntry) throws IOException {
+ long start = rawWriter.getPos();
+ long availBlockSpace = blockSize - (start % blockSize);
+
+ // see if stripe can fit in the current hdfs block, else pad the remaining
+ // space in the block
+ if (length < blockSize && length > availBlockSpace &&
+ addBlockPadding) {
+ byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+ LOG.info(String.format("Padding ORC by %d bytes while merging..",
+ availBlockSpace));
+ start += availBlockSpace;
+ while (availBlockSpace > 0) {
+ int writeLen = (int) Math.min(availBlockSpace, pad.length);
+ rawWriter.write(pad, 0, writeLen);
+ availBlockSpace -= writeLen;
+ }
+ }
+
+ rawWriter.write(stripe);
+ dirEntry.setOffset(start);
+ }
+
+
+ /**
+ * This class is used to hold the contents of streams as they are buffered.
+ * The TreeWriters write to the outStream and the codec compresses the
+ * data as buffers fill up and stores them in the output list. When the
+ * stripe is being written, the whole stream is written to the file.
+ */
+ private class BufferedStream implements OutStream.OutputReceiver {
+ private final OutStream outStream;
+ private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
+
+ BufferedStream(String name, int bufferSize,
+ CompressionCodec codec) throws IOException {
+ outStream = new OutStream(name, bufferSize, codec, this);
+ }
+
+ /**
+ * Receive a buffer from the compression codec.
+ * @param buffer the buffer to save
+ */
+ @Override
+ public void output(ByteBuffer buffer) {
+ output.add(buffer);
+ }
+
+ /**
+ * @return the number of bytes in buffers that are allocated to this stream.
+ */
+ public long getBufferSize() {
+ long result = 0;
+ for (ByteBuffer buf: output) {
+ result += buf.capacity();
+ }
+ return outStream.getBufferSize() + result;
+ }
+
+ /**
+ * Write any saved buffers to the OutputStream if needed, and clears all the buffers.
+ */
+ public void spillToDiskAndClear() throws IOException {
+ if (!outStream.isSuppressed()) {
+ for (ByteBuffer buffer: output) {
+ rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+ outStream.clear();
+ output.clear();
+ }
+
+ /**
+ * @return The number of bytes that will be written to the output. Assumes the stream writing
+ * into this receiver has already been flushed.
+ */
+ public long getOutputSize() {
+ long result = 0;
+ for (ByteBuffer buffer: output) {
+ result += buffer.remaining();
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return outStream.toString();
+ }
+ }
+
+ @Override
+ public OutStream getOrCreatePhysicalStream(StreamName name) throws IOException {
+ BufferedStream result = streams.get(name);
+ if (result == null) {
+ EnumSet<Modifier> modifiers = createCompressionModifiers(name.getKind());
+ result = new BufferedStream(name.toString(), bufferSize,
+ codec == null ? null : codec.modify(modifiers));
+ streams.put(name, result);
+ }
+ return result.outStream;
+ }
+
+ private EnumSet<Modifier> createCompressionModifiers(Kind kind) {
+ switch (kind) {
+ case BLOOM_FILTER:
+ case DATA:
+ case DICTIONARY_DATA:
+ return EnumSet.of(Modifier.TEXT,
+ compressionStrategy == CompressionStrategy.SPEED ? Modifier.FAST : Modifier.DEFAULT);
+ case LENGTH:
+ case DICTIONARY_COUNT:
+ case PRESENT:
+ case ROW_INDEX:
+ case SECONDARY:
+ // easily compressed using the fastest modes
+ return EnumSet.of(CompressionCodec.Modifier.FASTEST, CompressionCodec.Modifier.BINARY);
+ default:
+ LOG.warn("Missing ORC compression modifiers for " + kind);
+ return null;
+ }
+ }
+
+ @Override
+ public void finalizeStripe(StripeFooter.Builder footerBuilder,
+ StripeInformation.Builder dirEntry) throws IOException {
+ long indexSize = 0;
+ long dataSize = 0;
+ for (Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
+ BufferedStream receiver = pair.getValue();
+ OutStream outStream = receiver.outStream;
+ if (!outStream.isSuppressed()) {
+ outStream.flush();
+ long streamSize = receiver.getOutputSize();
+ StreamName name = pair.getKey();
+ footerBuilder.addStreams(OrcProto.Stream.newBuilder().setColumn(name.getColumn())
+ .setKind(name.getKind()).setLength(streamSize));
+ if (StreamName.Area.INDEX == name.getArea()) {
+ indexSize += streamSize;
+ } else {
+ dataSize += streamSize;
+ }
+ }
+ }
+ dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
+
+ OrcProto.StripeFooter footer = footerBuilder.build();
+ // Do we need to pad the file so the stripe doesn't straddle a block boundary?
+ padStripe(indexSize, dataSize, footer.getSerializedSize());
+
+ // write out the data streams
+ for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+ pair.getValue().spillToDiskAndClear();
+ }
+ // Write out the footer.
+ writeStripeFooter(footer, dataSize, indexSize, dirEntry);
+ }
+
+ @Override
+ public long estimateMemory() {
+ long result = 0;
+ for (BufferedStream stream: streams.values()) {
+ result += stream.getBufferSize();
+ }
+ return result;
+ }
+
+ @Override
+ public void writeIndexStream(StreamName name, Builder rowIndex) throws IOException {
+ OutStream stream = getOrCreatePhysicalStream(name);
+ rowIndex.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @Override
+ public void writeBloomFilterStream(
+ StreamName name, BloomFilterIndex.Builder bloomFilterIndex) throws IOException {
+ OutStream stream = getOrCreatePhysicalStream(name);
+ bloomFilterIndex.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @VisibleForTesting
+ public OutputStream getStream() throws IOException {
+ initialize();
+ return rawWriter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/PhysicalWriter.java b/orc/src/java/org/apache/hive/orc/impl/PhysicalWriter.java
new file mode 100644
index 0000000..dc0089a
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/PhysicalWriter.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+
+import org.apache.hive.orc.OrcProto.BloomFilterIndex;
+import org.apache.hive.orc.OrcProto.Footer;
+import org.apache.hive.orc.OrcProto.Metadata;
+import org.apache.hive.orc.OrcProto.PostScript;
+import org.apache.hive.orc.OrcProto.RowIndex;
+import org.apache.hive.orc.OrcProto.StripeFooter;
+import org.apache.hive.orc.OrcProto.StripeInformation;
+
+public interface PhysicalWriter {
+
+ /**
+ * Creates all the streams/connections/etc. necessary to write.
+ */
+ void initialize() throws IOException;
+
+ /**
+ * Writes out the file metadata.
+ * @param builder Metadata builder to finalize and write.
+ */
+ void writeFileMetadata(Metadata.Builder builder) throws IOException;
+
+ /**
+ * Writes out the file footer.
+ * @param builder Footer builder to finalize and write.
+ */
+ void writeFileFooter(Footer.Builder builder) throws IOException;
+
+ /**
+ * Writes out the postscript (including the size byte if needed).
+ * @param builder Postscript builder to finalize and write.
+ */
+ void writePostScript(PostScript.Builder builder) throws IOException;
+
+ /**
+ * Creates physical stream to write data to.
+ * @param name Stream name.
+ * @return The output stream.
+ */
+ OutStream getOrCreatePhysicalStream(StreamName name) throws IOException;
+
+ /**
+ * Flushes the data in all the streams, spills them to disk, write out stripe footer.
+ * @param footer Stripe footer to be updated with relevant data and written out.
+ * @param dirEntry File metadata entry for the stripe, to be updated with relevant data.
+ */
+ void finalizeStripe(StripeFooter.Builder footer,
+ StripeInformation.Builder dirEntry) throws IOException;
+
+ /**
+ * Writes out the index for the stripe column.
+ * @param streamName Stream name.
+ * @param rowIndex Row index entries to write.
+ */
+ void writeIndexStream(StreamName name, RowIndex.Builder rowIndex) throws IOException;
+
+ /**
+ * Writes out the index for the stripe column.
+ * @param streamName Stream name.
+ * @param bloomFilterIndex Bloom filter index to write.
+ */
+ void writeBloomFilterStream(StreamName streamName,
+ BloomFilterIndex.Builder bloomFilterIndex) throws IOException;
+
+ /**
+ * Closes the writer.
+ */
+ void close() throws IOException;
+
+ /**
+ * Force-flushes the writer.
+ */
+ void flush() throws IOException;
+
+ /**
+ * @return the physical writer position (e.g. for updater).
+ */
+ long getRawWriterPosition() throws IOException;
+
+ /** @return physical stripe size, taking padding into account. */
+ long getPhysicalStripeSize();
+
+ /** @return whether the writer is compressed. */
+ boolean isCompressed();
+
+ /**
+ * Appends raw stripe data (e.g. for file merger).
+ * @param stripe Stripe data buffer.
+ * @param offset Stripe data buffer offset.
+ * @param length Stripe data buffer length.
+ * @param dirEntry File metadata entry for the stripe, to be updated with relevant data.
+ * @throws IOException
+ */
+ void appendRawStripe(byte[] stripe, int offset, int length,
+ StripeInformation.Builder dirEntry) throws IOException;
+
+ /**
+ * @return the estimated memory usage for the stripe.
+ */
+ long estimateMemory();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/PositionProvider.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/PositionProvider.java b/orc/src/java/org/apache/hive/orc/impl/PositionProvider.java
new file mode 100644
index 0000000..36c2654
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/PositionProvider.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.orc.impl;
+
+/**
+ * An interface used for seeking to a row index.
+ */
+public interface PositionProvider {
+ long getNext();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/PositionRecorder.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/PositionRecorder.java b/orc/src/java/org/apache/hive/orc/impl/PositionRecorder.java
new file mode 100644
index 0000000..11eb9cc
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/PositionRecorder.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+/**
+ * An interface for recording positions in a stream.
+ */
+public interface PositionRecorder {
+ void addPosition(long offset);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/PositionedOutputStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/PositionedOutputStream.java b/orc/src/java/org/apache/hive/orc/impl/PositionedOutputStream.java
new file mode 100644
index 0000000..1b8f7c2
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/PositionedOutputStream.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class PositionedOutputStream extends OutputStream {
+
+ /**
+ * Record the current position to the recorder.
+ * @param recorder the object that receives the position
+ * @throws IOException
+ */
+ public abstract void getPosition(PositionRecorder recorder
+ ) throws IOException;
+
+ /**
+ * Get the memory size currently allocated as buffer associated with this
+ * stream.
+ * @return the number of bytes used by buffers.
+ */
+ public abstract long getBufferSize();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/ReaderImpl.java b/orc/src/java/org/apache/hive/orc/impl/ReaderImpl.java
new file mode 100644
index 0000000..8ab9e92
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/ReaderImpl.java
@@ -0,0 +1,763 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hive.orc.ColumnStatistics;
+import org.apache.hive.orc.CompressionCodec;
+import org.apache.hive.orc.FileFormatException;
+import org.apache.hive.orc.FileMetadata;
+import org.apache.hive.orc.OrcFile;
+import org.apache.hive.orc.CompressionKind;
+import org.apache.hive.orc.OrcUtils;
+import org.apache.hive.orc.Reader;
+import org.apache.hive.orc.RecordReader;
+import org.apache.hive.orc.TypeDescription;
+import org.apache.hive.orc.StripeInformation;
+import org.apache.hive.orc.StripeStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.orc.OrcProto;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.CodedInputStream;
+
+public class ReaderImpl implements Reader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReaderImpl.class);
+
+ private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
+
+ protected final FileSystem fileSystem;
+ private final long maxLength;
+ protected final Path path;
+ protected final CompressionKind compressionKind;
+ protected CompressionCodec codec;
+ protected int bufferSize;
+ protected OrcProto.Metadata metadata;
+ private List<OrcProto.StripeStatistics> stripeStats;
+ private final int metadataSize;
+ protected final List<OrcProto.Type> types;
+ private TypeDescription schema;
+ private final List<OrcProto.UserMetadataItem> userMetadata;
+ private final List<OrcProto.ColumnStatistics> fileStats;
+ private final List<StripeInformation> stripes;
+ protected final int rowIndexStride;
+ private final long contentLength, numberOfRows;
+
+ private long deserializedSize = -1;
+ protected final Configuration conf;
+ private final List<Integer> versionList;
+ private final OrcFile.WriterVersion writerVersion;
+
+ protected OrcTail tail;
+
+ public static class StripeInformationImpl
+ implements StripeInformation {
+ private final OrcProto.StripeInformation stripe;
+
+ public StripeInformationImpl(OrcProto.StripeInformation stripe) {
+ this.stripe = stripe;
+ }
+
+ @Override
+ public long getOffset() {
+ return stripe.getOffset();
+ }
+
+ @Override
+ public long getLength() {
+ return stripe.getDataLength() + getIndexLength() + getFooterLength();
+ }
+
+ @Override
+ public long getDataLength() {
+ return stripe.getDataLength();
+ }
+
+ @Override
+ public long getFooterLength() {
+ return stripe.getFooterLength();
+ }
+
+ @Override
+ public long getIndexLength() {
+ return stripe.getIndexLength();
+ }
+
+ @Override
+ public long getNumberOfRows() {
+ return stripe.getNumberOfRows();
+ }
+
+ @Override
+ public String toString() {
+ return "offset: " + getOffset() + " data: " + getDataLength() +
+ " rows: " + getNumberOfRows() + " tail: " + getFooterLength() +
+ " index: " + getIndexLength();
+ }
+ }
+
+ @Override
+ public long getNumberOfRows() {
+ return numberOfRows;
+ }
+
+ @Override
+ public List<String> getMetadataKeys() {
+ List<String> result = new ArrayList<String>();
+ for(OrcProto.UserMetadataItem item: userMetadata) {
+ result.add(item.getName());
+ }
+ return result;
+ }
+
+ @Override
+ public ByteBuffer getMetadataValue(String key) {
+ for(OrcProto.UserMetadataItem item: userMetadata) {
+ if (item.hasName() && item.getName().equals(key)) {
+ return item.getValue().asReadOnlyByteBuffer();
+ }
+ }
+ throw new IllegalArgumentException("Can't find user metadata " + key);
+ }
+
+ public boolean hasMetadataValue(String key) {
+ for(OrcProto.UserMetadataItem item: userMetadata) {
+ if (item.hasName() && item.getName().equals(key)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public CompressionKind getCompressionKind() {
+ return compressionKind;
+ }
+
+ @Override
+ public int getCompressionSize() {
+ return bufferSize;
+ }
+
+ @Override
+ public List<StripeInformation> getStripes() {
+ return stripes;
+ }
+
+ @Override
+ public long getContentLength() {
+ return contentLength;
+ }
+
+ @Override
+ public List<OrcProto.Type> getTypes() {
+ return types;
+ }
+
+ @Override
+ public OrcFile.Version getFileVersion() {
+ for (OrcFile.Version version: OrcFile.Version.values()) {
+ if ((versionList != null && !versionList.isEmpty()) &&
+ version.getMajor() == versionList.get(0) &&
+ version.getMinor() == versionList.get(1)) {
+ return version;
+ }
+ }
+ return OrcFile.Version.V_0_11;
+ }
+
+ @Override
+ public OrcFile.WriterVersion getWriterVersion() {
+ return writerVersion;
+ }
+
+ @Override
+ public OrcProto.FileTail getFileTail() {
+ return tail.getFileTail();
+ }
+
+ @Override
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ @Override
+ public ColumnStatistics[] getStatistics() {
+ ColumnStatistics[] result = new ColumnStatistics[types.size()];
+ for(int i=0; i < result.length; ++i) {
+ result[i] = ColumnStatisticsImpl.deserialize(fileStats.get(i));
+ }
+ return result;
+ }
+
+ @Override
+ public TypeDescription getSchema() {
+ return schema;
+ }
+
+ /**
+ * Ensure this is an ORC file to prevent users from trying to read text
+ * files or RC files as ORC files.
+ * @param in the file being read
+ * @param path the filename for error messages
+ * @param psLen the postscript length
+ * @param buffer the tail of the file
+ * @throws IOException
+ */
+ protected static void ensureOrcFooter(FSDataInputStream in,
+ Path path,
+ int psLen,
+ ByteBuffer buffer) throws IOException {
+ int magicLength = OrcFile.MAGIC.length();
+ int fullLength = magicLength + 1;
+ if (psLen < fullLength || buffer.remaining() < fullLength) {
+ throw new FileFormatException("Malformed ORC file " + path +
+ ". Invalid postscript length " + psLen);
+ }
+ int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength;
+ byte[] array = buffer.array();
+ // now look for the magic string at the end of the postscript.
+ if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) {
+ // If it isn't there, this may be the 0.11.0 version of ORC.
+ // Read the first 3 bytes of the file to check for the header
+ byte[] header = new byte[magicLength];
+ in.readFully(0, header, 0, magicLength);
+ // if it isn't there, this isn't an ORC file
+ if (!Text.decode(header, 0 , magicLength).equals(OrcFile.MAGIC)) {
+ throw new FileFormatException("Malformed ORC file " + path +
+ ". Invalid postscript.");
+ }
+ }
+ }
+
+ /**
+ * Ensure this is an ORC file to prevent users from trying to read text
+ * files or RC files as ORC files.
+ * @param psLen the postscript length
+ * @param buffer the tail of the file
+ * @throws IOException
+ */
+ protected static void ensureOrcFooter(ByteBuffer buffer, int psLen) throws IOException {
+ int magicLength = OrcFile.MAGIC.length();
+ int fullLength = magicLength + 1;
+ if (psLen < fullLength || buffer.remaining() < fullLength) {
+ throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen);
+ }
+
+ int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength;
+ byte[] array = buffer.array();
+ // now look for the magic string at the end of the postscript.
+ if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) {
+ // if it isn't there, this may be 0.11.0 version of the ORC file.
+ // Read the first 3 bytes from the buffer to check for the header
+ if (!Text.decode(buffer.array(), 0, magicLength).equals(OrcFile.MAGIC)) {
+ throw new FileFormatException("Malformed ORC file. Invalid postscript length " + psLen);
+ }
+ }
+ }
+
+ /**
+ * Build a version string out of an array.
+ * @param version the version number as a list
+ * @return the human readable form of the version string
+ */
+ private static String versionString(List<Integer> version) {
+ StringBuilder buffer = new StringBuilder();
+ for(int i=0; i < version.size(); ++i) {
+ if (i != 0) {
+ buffer.append('.');
+ }
+ buffer.append(version.get(i));
+ }
+ return buffer.toString();
+ }
+
+ /**
+ * Check to see if this ORC file is from a future version and if so,
+ * warn the user that we may not be able to read all of the column encodings.
+ * @param log the logger to write any error message to
+ * @param path the data source path for error messages
+ * @param version the version of hive that wrote the file.
+ */
+ protected static void checkOrcVersion(Logger log, Path path,
+ List<Integer> version) {
+ if (version.size() >= 1) {
+ int major = version.get(0);
+ int minor = 0;
+ if (version.size() >= 2) {
+ minor = version.get(1);
+ }
+ if (major > OrcFile.Version.CURRENT.getMajor() ||
+ (major == OrcFile.Version.CURRENT.getMajor() &&
+ minor > OrcFile.Version.CURRENT.getMinor())) {
+ log.warn(path + " was written by a future Hive version " +
+ versionString(version) +
+ ". This file may not be readable by this version of Hive.");
+ }
+ }
+ }
+
+ /**
+ * Constructor that let's the user specify additional options.
+ * @param path pathname for file
+ * @param options options for reading
+ * @throws IOException
+ */
+ public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws IOException {
+ FileSystem fs = options.getFilesystem();
+ if (fs == null) {
+ fs = path.getFileSystem(options.getConfiguration());
+ }
+ this.fileSystem = fs;
+ this.path = path;
+ this.conf = options.getConfiguration();
+ this.maxLength = options.getMaxLength();
+ FileMetadata fileMetadata = options.getFileMetadata();
+ if (fileMetadata != null) {
+ this.compressionKind = fileMetadata.getCompressionKind();
+ this.bufferSize = fileMetadata.getCompressionBufferSize();
+ this.codec = PhysicalFsWriter.createCodec(compressionKind);
+ this.metadataSize = fileMetadata.getMetadataSize();
+ this.stripeStats = fileMetadata.getStripeStats();
+ this.versionList = fileMetadata.getVersionList();
+ this.writerVersion =
+ OrcFile.WriterVersion.from(fileMetadata.getWriterVersionNum());
+ this.types = fileMetadata.getTypes();
+ this.rowIndexStride = fileMetadata.getRowIndexStride();
+ this.contentLength = fileMetadata.getContentLength();
+ this.numberOfRows = fileMetadata.getNumberOfRows();
+ this.fileStats = fileMetadata.getFileStats();
+ this.stripes = fileMetadata.getStripes();
+ this.userMetadata = null; // not cached and not needed here
+ } else {
+ OrcTail orcTail = options.getOrcTail();
+ if (orcTail == null) {
+ tail = extractFileTail(fs, path, options.getMaxLength());
+ options.orcTail(tail);
+ } else {
+ tail = orcTail;
+ }
+ this.compressionKind = tail.getCompressionKind();
+ this.codec = tail.getCompressionCodec();
+ this.bufferSize = tail.getCompressionBufferSize();
+ this.metadataSize = tail.getMetadataSize();
+ this.versionList = tail.getPostScript().getVersionList();
+ this.types = tail.getFooter().getTypesList();
+ this.rowIndexStride = tail.getFooter().getRowIndexStride();
+ this.contentLength = tail.getFooter().getContentLength();
+ this.numberOfRows = tail.getFooter().getNumberOfRows();
+ this.userMetadata = tail.getFooter().getMetadataList();
+ this.fileStats = tail.getFooter().getStatisticsList();
+ this.writerVersion = tail.getWriterVersion();
+ this.stripes = tail.getStripes();
+ this.stripeStats = tail.getStripeStatisticsProto();
+ }
+ this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0);
+ }
+
+ /**
+ * Get the WriterVersion based on the ORC file postscript.
+ * @param writerVersion the integer writer version
+ * @return the version of the software that produced the file
+ */
+ public static OrcFile.WriterVersion getWriterVersion(int writerVersion) {
+ for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) {
+ if (version.getId() == writerVersion) {
+ return version;
+ }
+ }
+ return OrcFile.WriterVersion.FUTURE;
+ }
+
+ private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
+ int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
+ bb.position(footerAbsPos);
+ bb.limit(footerAbsPos + footerSize);
+ return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer",
+ Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), footerSize, codec, bufferSize));
+ }
+
+ public static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
+ int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
+ bb.position(metadataAbsPos);
+ bb.limit(metadataAbsPos + metadataSize);
+ return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata",
+ Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize));
+ }
+
+ private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
+ int psLen, int psAbsOffset) throws IOException {
+ // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here.
+ assert bb.hasArray();
+ CodedInputStream in = CodedInputStream.newInstance(
+ bb.array(), bb.arrayOffset() + psAbsOffset, psLen);
+ OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
+ checkOrcVersion(LOG, path, ps.getVersionList());
+
+ // Check compression codec.
+ switch (ps.getCompression()) {
+ case NONE:
+ break;
+ case ZLIB:
+ break;
+ case SNAPPY:
+ break;
+ case LZO:
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown compression");
+ }
+ return ps;
+ }
+
+ public static OrcTail extractFileTail(ByteBuffer buffer)
+ throws IOException {
+ return extractFileTail(buffer, -1, -1);
+ }
+
+ public static OrcTail extractFileTail(ByteBuffer buffer, long fileLength, long modificationTime)
+ throws IOException {
+ int readSize = buffer.limit();
+ int psLen = buffer.get(readSize - 1) & 0xff;
+ int psOffset = readSize - 1 - psLen;
+ ensureOrcFooter(buffer, psLen);
+ byte[] psBuffer = new byte[psLen];
+ System.arraycopy(buffer.array(), psOffset, psBuffer, 0, psLen);
+ OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(psBuffer);
+ int footerSize = (int) ps.getFooterLength();
+ CompressionCodec codec = PhysicalFsWriter
+ .createCodec(CompressionKind.valueOf(ps.getCompression().name()));
+ OrcProto.Footer footer = extractFooter(buffer,
+ (int) (buffer.position() + ps.getMetadataLength()),
+ footerSize, codec, (int) ps.getCompressionBlockSize());
+ OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder()
+ .setPostscriptLength(psLen)
+ .setPostscript(ps)
+ .setFooter(footer)
+ .setFileLength(fileLength);
+ // clear does not clear the contents but sets position to 0 and limit = capacity
+ buffer.clear();
+ return new OrcTail(fileTailBuilder.build(), buffer.slice(), modificationTime);
+ }
+
+ protected OrcTail extractFileTail(FileSystem fs, Path path,
+ long maxFileLength) throws IOException {
+ FSDataInputStream file = fs.open(path);
+ ByteBuffer buffer;
+ OrcProto.PostScript ps;
+ OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder();
+ long modificationTime;
+ try {
+ // figure out the size of the file using the option or filesystem
+ long size;
+ if (maxFileLength == Long.MAX_VALUE) {
+ FileStatus fileStatus = fs.getFileStatus(path);
+ size = fileStatus.getLen();
+ modificationTime = fileStatus.getModificationTime();
+ } else {
+ size = maxFileLength;
+ modificationTime = -1;
+ }
+ fileTailBuilder.setFileLength(size);
+
+ //read last bytes into buffer to get PostScript
+ int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
+ buffer = ByteBuffer.allocate(readSize);
+ assert buffer.position() == 0;
+ file.readFully((size - readSize),
+ buffer.array(), buffer.arrayOffset(), readSize);
+ buffer.position(0);
+
+ //read the PostScript
+ //get length of PostScript
+ int psLen = buffer.get(readSize - 1) & 0xff;
+ ensureOrcFooter(file, path, psLen, buffer);
+ int psOffset = readSize - 1 - psLen;
+ ps = extractPostScript(buffer, path, psLen, psOffset);
+ bufferSize = (int) ps.getCompressionBlockSize();
+ codec = PhysicalFsWriter.createCodec(CompressionKind.valueOf(ps.getCompression().name()));
+ fileTailBuilder.setPostscriptLength(psLen).setPostscript(ps);
+
+ int footerSize = (int) ps.getFooterLength();
+ int metadataSize = (int) ps.getMetadataLength();
+
+ //check if extra bytes need to be read
+ int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
+ int tailSize = 1 + psLen + footerSize + metadataSize;
+ if (extra > 0) {
+ //more bytes need to be read, seek back to the right place and read extra bytes
+ ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
+ file.readFully((size - readSize - extra), extraBuf.array(),
+ extraBuf.arrayOffset() + extraBuf.position(), extra);
+ extraBuf.position(extra);
+ //append with already read bytes
+ extraBuf.put(buffer);
+ buffer = extraBuf;
+ buffer.position(0);
+ buffer.limit(tailSize);
+ readSize += extra;
+ psOffset = readSize - 1 - psLen;
+ } else {
+ //footer is already in the bytes in buffer, just adjust position, length
+ buffer.position(psOffset - footerSize - metadataSize);
+ buffer.limit(buffer.position() + tailSize);
+ }
+
+ buffer.mark();
+ int footerOffset = psOffset - footerSize;
+ buffer.position(footerOffset);
+ ByteBuffer footerBuffer = buffer.slice();
+ buffer.reset();
+ OrcProto.Footer footer = extractFooter(footerBuffer, 0, footerSize,
+ codec, bufferSize);
+ fileTailBuilder.setFooter(footer);
+ } finally {
+ try {
+ file.close();
+ } catch (IOException ex) {
+ LOG.error("Failed to close the file after another error", ex);
+ }
+ }
+
+ ByteBuffer serializedTail = ByteBuffer.allocate(buffer.remaining());
+ serializedTail.put(buffer.slice());
+ serializedTail.rewind();
+ return new OrcTail(fileTailBuilder.build(), serializedTail, modificationTime);
+ }
+
+ @Override
+ public ByteBuffer getSerializedFileFooter() {
+ return tail.getSerializedTail();
+ }
+
+ @Override
+ public RecordReader rows() throws IOException {
+ return rows(new Options());
+ }
+
+ @Override
+ public RecordReader rows(Options options) throws IOException {
+ LOG.info("Reading ORC rows from " + path + " with " + options);
+ return new RecordReaderImpl(this, options);
+ }
+
+
+ @Override
+ public long getRawDataSize() {
+ // if the deserializedSize is not computed, then compute it, else
+ // return the already computed size. since we are reading from the footer
+ // we don't have to compute deserialized size repeatedly
+ if (deserializedSize == -1) {
+ List<Integer> indices = Lists.newArrayList();
+ for (int i = 0; i < fileStats.size(); ++i) {
+ indices.add(i);
+ }
+ deserializedSize = getRawDataSizeFromColIndices(indices);
+ }
+ return deserializedSize;
+ }
+
+ @Override
+ public long getRawDataSizeFromColIndices(List<Integer> colIndices) {
+ return getRawDataSizeFromColIndices(colIndices, types, fileStats);
+ }
+
+ public static long getRawDataSizeFromColIndices(
+ List<Integer> colIndices, List<OrcProto.Type> types,
+ List<OrcProto.ColumnStatistics> stats) {
+ long result = 0;
+ for (int colIdx : colIndices) {
+ result += getRawDataSizeOfColumn(colIdx, types, stats);
+ }
+ return result;
+ }
+
+ private static long getRawDataSizeOfColumn(int colIdx, List<OrcProto.Type> types,
+ List<OrcProto.ColumnStatistics> stats) {
+ OrcProto.ColumnStatistics colStat = stats.get(colIdx);
+ long numVals = colStat.getNumberOfValues();
+ OrcProto.Type type = types.get(colIdx);
+
+ switch (type.getKind()) {
+ case BINARY:
+ // old orc format doesn't support binary statistics. checking for binary
+ // statistics is not required as protocol buffers takes care of it.
+ return colStat.getBinaryStatistics().getSum();
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ // old orc format doesn't support sum for string statistics. checking for
+ // existence is not required as protocol buffers takes care of it.
+
+ // ORC strings are deserialized to java strings. so use java data model's
+ // string size
+ numVals = numVals == 0 ? 1 : numVals;
+ int avgStrLen = (int) (colStat.getStringStatistics().getSum() / numVals);
+ return numVals * JavaDataModel.get().lengthForStringOfLength(avgStrLen);
+ case TIMESTAMP:
+ return numVals * JavaDataModel.get().lengthOfTimestamp();
+ case DATE:
+ return numVals * JavaDataModel.get().lengthOfDate();
+ case DECIMAL:
+ return numVals * JavaDataModel.get().lengthOfDecimal();
+ case DOUBLE:
+ case LONG:
+ return numVals * JavaDataModel.get().primitive2();
+ case FLOAT:
+ case INT:
+ case SHORT:
+ case BOOLEAN:
+ case BYTE:
+ return numVals * JavaDataModel.get().primitive1();
+ default:
+ LOG.debug("Unknown primitive category: " + type.getKind());
+ break;
+ }
+
+ return 0;
+ }
+
+ @Override
+ public long getRawDataSizeOfColumns(List<String> colNames) {
+ List<Integer> colIndices = getColumnIndicesFromNames(colNames);
+ return getRawDataSizeFromColIndices(colIndices);
+ }
+
+ private List<Integer> getColumnIndicesFromNames(List<String> colNames) {
+ // top level struct
+ OrcProto.Type type = types.get(0);
+ List<Integer> colIndices = Lists.newArrayList();
+ List<String> fieldNames = type.getFieldNamesList();
+ int fieldIdx;
+ for (String colName : colNames) {
+ if (fieldNames.contains(colName)) {
+ fieldIdx = fieldNames.indexOf(colName);
+ } else {
+ String s = "Cannot find field for: " + colName + " in ";
+ for (String fn : fieldNames) {
+ s += fn + ", ";
+ }
+ LOG.warn(s);
+ continue;
+ }
+
+ // a single field may span multiple columns. find start and end column
+ // index for the requested field
+ int idxStart = type.getSubtypes(fieldIdx);
+
+ int idxEnd;
+
+ // if the specified is the last field and then end index will be last
+ // column index
+ if (fieldIdx + 1 > fieldNames.size() - 1) {
+ idxEnd = getLastIdx() + 1;
+ } else {
+ idxEnd = type.getSubtypes(fieldIdx + 1);
+ }
+
+ // if start index and end index are same then the field is a primitive
+ // field else complex field (like map, list, struct, union)
+ if (idxStart == idxEnd) {
+ // simple field
+ colIndices.add(idxStart);
+ } else {
+ // complex fields spans multiple columns
+ for (int i = idxStart; i < idxEnd; i++) {
+ colIndices.add(i);
+ }
+ }
+ }
+ return colIndices;
+ }
+
+ private int getLastIdx() {
+ Set<Integer> indices = new HashSet<>();
+ for (OrcProto.Type type : types) {
+ indices.addAll(type.getSubtypesList());
+ }
+ return Collections.max(indices);
+ }
+
+ @Override
+ public List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics() {
+ return stripeStats;
+ }
+
+ @Override
+ public List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics() {
+ return fileStats;
+ }
+
+ @Override
+ public List<StripeStatistics> getStripeStatistics() throws IOException {
+ if (stripeStats == null && metadata == null) {
+ metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize);
+ stripeStats = metadata.getStripeStatsList();
+ }
+ List<StripeStatistics> result = new ArrayList<>();
+ for (OrcProto.StripeStatistics ss : stripeStats) {
+ result.add(new StripeStatistics(ss.getColStatsList()));
+ }
+ return result;
+ }
+
+ public List<OrcProto.UserMetadataItem> getOrcProtoUserMetadata() {
+ return userMetadata;
+ }
+
+ @Override
+ public List<Integer> getVersionList() {
+ return versionList;
+ }
+
+ @Override
+ public int getMetadataSize() {
+ return metadataSize;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("ORC Reader(");
+ buffer.append(path);
+ if (maxLength != -1) {
+ buffer.append(", ");
+ buffer.append(maxLength);
+ }
+ buffer.append(")");
+ return buffer.toString();
+ }
+}