You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/08 19:07:37 UTC
[1/2] hive git commit: HIVE-14453 : refactor physical writing of ORC
data and metadata to FS from the logical writers (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 84b7fc5bd -> 777477f25
HIVE-14453 : refactor physical writing of ORC data and metadata to FS from the logical writers (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/65d8fae0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/65d8fae0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/65d8fae0
Branch: refs/heads/master
Commit: 65d8fae07034e640f13e5b5c88db57204d4e0787
Parents: 84b7fc5
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Dec 8 11:04:42 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Dec 8 11:04:42 2016 -0800
----------------------------------------------------------------------
.../llap/io/decode/OrcEncodedDataConsumer.java | 3 +-
orc/src/java/org/apache/orc/impl/OrcTail.java | 2 +-
.../org/apache/orc/impl/PhysicalFsWriter.java | 529 ++++++++++++++++++
.../org/apache/orc/impl/PhysicalWriter.java | 123 +++++
.../java/org/apache/orc/impl/ReaderImpl.java | 6 +-
.../org/apache/orc/impl/RecordReaderUtils.java | 2 +-
.../java/org/apache/orc/impl/WriterImpl.java | 535 ++-----------------
.../org/apache/orc/impl/TestOrcWideTable.java | 12 +-
.../hadoop/hive/ql/io/orc/WriterImpl.java | 8 +
.../hive/ql/io/orc/TestInputOutputFormat.java | 25 +-
10 files changed, 743 insertions(+), 502 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 2cb7f79..29f1ba8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
import org.apache.orc.OrcUtils;
import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PhysicalFsWriter;
import org.apache.orc.impl.TreeReaderFactory;
import org.apache.orc.impl.TreeReaderFactory.StructTreeReader;
import org.apache.orc.impl.TreeReaderFactory.TreeReader;
@@ -85,7 +86,7 @@ public class OrcEncodedDataConsumer
assert fileMetadata == null;
fileMetadata = f;
stripes = new OrcStripeMetadata[f.getStripes().size()];
- codec = WriterImpl.createCodec(fileMetadata.getCompressionKind());
+ codec = PhysicalFsWriter.createCodec(fileMetadata.getCompressionKind());
}
public void setStripeMetadata(OrcStripeMetadata m) {
http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/OrcTail.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/OrcTail.java b/orc/src/java/org/apache/orc/impl/OrcTail.java
index b5f85fb..f095603 100644
--- a/orc/src/java/org/apache/orc/impl/OrcTail.java
+++ b/orc/src/java/org/apache/orc/impl/OrcTail.java
@@ -87,7 +87,7 @@ public final class OrcTail {
}
public CompressionCodec getCompressionCodec() {
- return WriterImpl.createCodec(getCompressionKind());
+ return PhysicalFsWriter.createCodec(getCompressionKind());
}
public int getCompressionBufferSize() {
http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java
new file mode 100644
index 0000000..ba8c13f
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -0,0 +1,529 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionCodec.Modifier;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcFile.CompressionStrategy;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.BloomFilterIndex;
+import org.apache.orc.OrcProto.Footer;
+import org.apache.orc.OrcProto.Metadata;
+import org.apache.orc.OrcProto.PostScript;
+import org.apache.orc.OrcProto.Stream.Kind;
+import org.apache.orc.OrcProto.StripeFooter;
+import org.apache.orc.OrcProto.StripeInformation;
+import org.apache.orc.OrcProto.RowIndex.Builder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.CodedOutputStream;
+
+public class PhysicalFsWriter implements PhysicalWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class);
+
+ private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+
+ private FSDataOutputStream rawWriter = null;
+ // the compressed metadata information outStream
+ private OutStream writer = null;
+ // a protobuf outStream around streamFactory
+ private CodedOutputStream protobufWriter = null;
+
+ private final FileSystem fs;
+ private final Path path;
+ private final long blockSize;
+ private final int bufferSize;
+ private final CompressionCodec codec;
+ private final double paddingTolerance;
+ private final long defaultStripeSize;
+ private final CompressionKind compress;
+ private final boolean addBlockPadding;
+ private final CompressionStrategy compressionStrategy;
+
+ // the streams that make up the current stripe
+ private final Map<StreamName, BufferedStream> streams =
+ new TreeMap<StreamName, BufferedStream>();
+
+ private long adjustedStripeSize;
+ private long headerLength;
+ private long stripeStart;
+ private int metadataLength;
+ private int footerLength;
+
+ public PhysicalFsWriter(FileSystem fs, Path path, int numColumns, OrcFile.WriterOptions opts) {
+ this.fs = fs;
+ this.path = path;
+ this.defaultStripeSize = this.adjustedStripeSize = opts.getStripeSize();
+ this.addBlockPadding = opts.getBlockPadding();
+ if (opts.isEnforceBufferSize()) {
+ this.bufferSize = opts.getBufferSize();
+ } else {
+ this.bufferSize = getEstimatedBufferSize(defaultStripeSize, numColumns, opts.getBufferSize());
+ }
+ this.compress = opts.getCompress();
+ this.compressionStrategy = opts.getCompressionStrategy();
+ codec = createCodec(compress);
+ this.paddingTolerance = opts.getPaddingTolerance();
+ this.blockSize = opts.getBlockSize();
+ LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
+ " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
+ compress, bufferSize);
+ }
+
+ @Override
+ public void initialize() throws IOException {
+ if (rawWriter != null) return;
+ rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
+ fs.getDefaultReplication(path), blockSize);
+ rawWriter.writeBytes(OrcFile.MAGIC);
+ headerLength = rawWriter.getPos();
+ writer = new OutStream("metadata", bufferSize, codec,
+ new DirectStream(rawWriter));
+ protobufWriter = CodedOutputStream.newInstance(writer);
+ }
+
+ private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException {
+ this.stripeStart = rawWriter.getPos();
+ final long currentStripeSize = indexSize + dataSize + footerSize;
+ final long available = blockSize - (stripeStart % blockSize);
+ final long overflow = currentStripeSize - adjustedStripeSize;
+ final float availRatio = (float) available / (float) defaultStripeSize;
+
+ if (availRatio > 0.0f && availRatio < 1.0f
+ && availRatio > paddingTolerance) {
+ // adjust default stripe size to fit into remaining space, also adjust
+ // the next stripe for correction based on the current stripe size
+ // and user specified padding tolerance. Since stripe size can overflow
+ // the default stripe size we should apply this correction to avoid
+ // writing portion of last stripe to next hdfs block.
+ double correction = overflow > 0 ? (double) overflow
+ / (double) adjustedStripeSize : 0.0;
+
+ // correction should not be greater than user specified padding
+ // tolerance
+ correction = correction > paddingTolerance ? paddingTolerance
+ : correction;
+
+ // adjust next stripe size based on current stripe estimate correction
+ adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
+ } else if (availRatio >= 1.0) {
+ adjustedStripeSize = defaultStripeSize;
+ }
+
+ if (availRatio < paddingTolerance && addBlockPadding) {
+ long padding = blockSize - (stripeStart % blockSize);
+ byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
+ LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)",
+ padding, availRatio, defaultStripeSize));
+ stripeStart += padding;
+ while (padding > 0) {
+ int writeLen = (int) Math.min(padding, pad.length);
+ rawWriter.write(pad, 0, writeLen);
+ padding -= writeLen;
+ }
+ adjustedStripeSize = defaultStripeSize;
+ } else if (currentStripeSize < blockSize
+ && (stripeStart % blockSize) + currentStripeSize > blockSize) {
+ // even if you don't pad, reset the default stripe size when crossing a
+ // block boundary
+ adjustedStripeSize = defaultStripeSize;
+ }
+ }
+
+ /**
+ * An output receiver that writes the ByteBuffers to the output stream
+ * as they are received.
+ */
+ private class DirectStream implements OutStream.OutputReceiver {
+ private final FSDataOutputStream output;
+
+ DirectStream(FSDataOutputStream output) {
+ this.output = output;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ output.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ }
+ }
+
+ @Override
+ public long getPhysicalStripeSize() {
+ return adjustedStripeSize;
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return codec != null;
+ }
+
+
+ public static CompressionCodec createCodec(CompressionKind kind) {
+ switch (kind) {
+ case NONE:
+ return null;
+ case ZLIB:
+ return new ZlibCodec();
+ case SNAPPY:
+ return new SnappyCodec();
+ case LZO:
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ if (loader == null) {
+ loader = WriterImpl.class.getClassLoader();
+ }
+ @SuppressWarnings("unchecked")
+ Class<? extends CompressionCodec> lzo =
+ (Class<? extends CompressionCodec>)
+ loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+ return lzo.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("LZO is not available.", e);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Problem initializing LZO", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Insufficient access to LZO", e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression codec: " +
+ kind);
+ }
+ }
+
+ private void writeStripeFooter(StripeFooter footer, long dataSize, long indexSize,
+ StripeInformation.Builder dirEntry) throws IOException {
+ footer.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ dirEntry.setOffset(stripeStart);
+ dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - dataSize - indexSize);
+ }
+
+ @VisibleForTesting
+ public static int getEstimatedBufferSize(long stripeSize, int numColumns,
+ int bs) {
+ // The worst case is that there are 2 big streams per a column and
+ // we want to guarantee that each stream gets ~10 buffers.
+ // This keeps buffers small enough that we don't get really small stripe
+ // sizes.
+ int estBufferSize = (int) (stripeSize / (20 * numColumns));
+ estBufferSize = getClosestBufferSize(estBufferSize);
+ return estBufferSize > bs ? bs : estBufferSize;
+ }
+
+ private static int getClosestBufferSize(int estBufferSize) {
+ final int kb4 = 4 * 1024;
+ final int kb8 = 8 * 1024;
+ final int kb16 = 16 * 1024;
+ final int kb32 = 32 * 1024;
+ final int kb64 = 64 * 1024;
+ final int kb128 = 128 * 1024;
+ final int kb256 = 256 * 1024;
+ if (estBufferSize <= kb4) {
+ return kb4;
+ } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
+ return kb8;
+ } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
+ return kb16;
+ } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
+ return kb32;
+ } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
+ return kb64;
+ } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
+ return kb128;
+ } else {
+ return kb256;
+ }
+ }
+
+ @Override
+ public void writeFileMetadata(Metadata.Builder builder) throws IOException {
+ long startPosn = rawWriter.getPos();
+ Metadata metadata = builder.build();
+ metadata.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ this.metadataLength = (int) (rawWriter.getPos() - startPosn);
+ }
+
+ @Override
+ public void writeFileFooter(Footer.Builder builder) throws IOException {
+ long bodyLength = rawWriter.getPos() - metadataLength;
+ builder.setContentLength(bodyLength);
+ builder.setHeaderLength(headerLength);
+ long startPosn = rawWriter.getPos();
+ Footer footer = builder.build();
+ footer.writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ this.footerLength = (int) (rawWriter.getPos() - startPosn);
+ }
+
+ @Override
+ public void writePostScript(PostScript.Builder builder) throws IOException {
+ builder.setCompression(writeCompressionKind(compress));
+ builder.setFooterLength(footerLength);
+ builder.setMetadataLength(metadataLength);
+ if (compress != CompressionKind.NONE) {
+ builder.setCompressionBlockSize(bufferSize);
+ }
+ PostScript ps = builder.build();
+ // need to write this uncompressed
+ long startPosn = rawWriter.getPos();
+ ps.writeTo(rawWriter);
+ long length = rawWriter.getPos() - startPosn;
+ if (length > 255) {
+ throw new IllegalArgumentException("PostScript too large at " + length);
+ }
+ rawWriter.writeByte((int)length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ rawWriter.close();
+ }
+
+ private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
+ switch (kind) {
+ case NONE: return OrcProto.CompressionKind.NONE;
+ case ZLIB: return OrcProto.CompressionKind.ZLIB;
+ case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
+ case LZO: return OrcProto.CompressionKind.LZO;
+ default:
+ throw new IllegalArgumentException("Unknown compression " + kind);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ rawWriter.hflush();
+ // TODO: reset?
+ }
+
+ @Override
+ public long getRawWriterPosition() throws IOException {
+ return rawWriter.getPos();
+ }
+
+ @Override
+ public void appendRawStripe(byte[] stripe, int offset, int length,
+ StripeInformation.Builder dirEntry) throws IOException {
+ long start = rawWriter.getPos();
+ long availBlockSpace = blockSize - (start % blockSize);
+
+ // see if stripe can fit in the current hdfs block, else pad the remaining
+ // space in the block
+ if (length < blockSize && length > availBlockSpace &&
+ addBlockPadding) {
+ byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
+ LOG.info(String.format("Padding ORC by %d bytes while merging..",
+ availBlockSpace));
+ start += availBlockSpace;
+ while (availBlockSpace > 0) {
+ int writeLen = (int) Math.min(availBlockSpace, pad.length);
+ rawWriter.write(pad, 0, writeLen);
+ availBlockSpace -= writeLen;
+ }
+ }
+
+ rawWriter.write(stripe);
+ dirEntry.setOffset(start);
+ }
+
+
+ /**
+ * This class is used to hold the contents of streams as they are buffered.
+ * The TreeWriters write to the outStream and the codec compresses the
+ * data as buffers fill up and stores them in the output list. When the
+ * stripe is being written, the whole stream is written to the file.
+ */
+ private class BufferedStream implements OutStream.OutputReceiver {
+ private final OutStream outStream;
+ private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
+
+ BufferedStream(String name, int bufferSize,
+ CompressionCodec codec) throws IOException {
+ outStream = new OutStream(name, bufferSize, codec, this);
+ }
+
+ /**
+ * Receive a buffer from the compression codec.
+ * @param buffer the buffer to save
+ */
+ @Override
+ public void output(ByteBuffer buffer) {
+ output.add(buffer);
+ }
+
+ /**
+ * @return the number of bytes in buffers that are allocated to this stream.
+ */
+ public long getBufferSize() {
+ long result = 0;
+ for (ByteBuffer buf: output) {
+ result += buf.capacity();
+ }
+ return outStream.getBufferSize() + result;
+ }
+
+ /**
+ * Write any saved buffers to the OutputStream if needed, and clears all the buffers.
+ */
+ public void spillToDiskAndClear() throws IOException {
+ if (!outStream.isSuppressed()) {
+ for (ByteBuffer buffer: output) {
+ rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+ outStream.clear();
+ output.clear();
+ }
+
+ /**
+ * @return The number of bytes that will be written to the output. Assumes the stream writing
+ * into this receiver has already been flushed.
+ */
+ public long getOutputSize() {
+ long result = 0;
+ for (ByteBuffer buffer: output) {
+ result += buffer.remaining();
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return outStream.toString();
+ }
+ }
+
+ @Override
+ public OutStream getOrCreatePhysicalStream(StreamName name) throws IOException {
+ BufferedStream result = streams.get(name);
+ if (result == null) {
+ EnumSet<Modifier> modifiers = createCompressionModifiers(name.getKind());
+ result = new BufferedStream(name.toString(), bufferSize,
+ codec == null ? null : codec.modify(modifiers));
+ streams.put(name, result);
+ }
+ return result.outStream;
+ }
+
+ private EnumSet<Modifier> createCompressionModifiers(Kind kind) {
+ switch (kind) {
+ case BLOOM_FILTER:
+ case DATA:
+ case DICTIONARY_DATA:
+ return EnumSet.of(Modifier.TEXT,
+ compressionStrategy == CompressionStrategy.SPEED ? Modifier.FAST : Modifier.DEFAULT);
+ case LENGTH:
+ case DICTIONARY_COUNT:
+ case PRESENT:
+ case ROW_INDEX:
+ case SECONDARY:
+ // easily compressed using the fastest modes
+ return EnumSet.of(CompressionCodec.Modifier.FASTEST, CompressionCodec.Modifier.BINARY);
+ default:
+ LOG.warn("Missing ORC compression modifiers for " + kind);
+ return null;
+ }
+ }
+
+ @Override
+ public void finalizeStripe(StripeFooter.Builder footerBuilder,
+ StripeInformation.Builder dirEntry) throws IOException {
+ long indexSize = 0;
+ long dataSize = 0;
+ for (Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
+ BufferedStream receiver = pair.getValue();
+ OutStream outStream = receiver.outStream;
+ if (!outStream.isSuppressed()) {
+ outStream.flush();
+ long streamSize = receiver.getOutputSize();
+ StreamName name = pair.getKey();
+ footerBuilder.addStreams(OrcProto.Stream.newBuilder().setColumn(name.getColumn())
+ .setKind(name.getKind()).setLength(streamSize));
+ if (StreamName.Area.INDEX == name.getArea()) {
+ indexSize += streamSize;
+ } else {
+ dataSize += streamSize;
+ }
+ }
+ }
+ dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
+
+ OrcProto.StripeFooter footer = footerBuilder.build();
+ // Do we need to pad the file so the stripe doesn't straddle a block boundary?
+ padStripe(indexSize, dataSize, footer.getSerializedSize());
+
+ // write out the data streams
+ for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
+ pair.getValue().spillToDiskAndClear();
+ }
+ // Write out the footer.
+ writeStripeFooter(footer, dataSize, indexSize, dirEntry);
+ }
+
+ @Override
+ public long estimateMemory() {
+ long result = 0;
+ for (BufferedStream stream: streams.values()) {
+ result += stream.getBufferSize();
+ }
+ return result;
+ }
+
+ @Override
+ public void writeIndexStream(StreamName name, Builder rowIndex) throws IOException {
+ OutStream stream = getOrCreatePhysicalStream(name);
+ rowIndex.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @Override
+ public void writeBloomFilterStream(
+ StreamName name, BloomFilterIndex.Builder bloomFilterIndex) throws IOException {
+ OutStream stream = getOrCreatePhysicalStream(name);
+ bloomFilterIndex.build().writeTo(stream);
+ stream.flush();
+ }
+
+ @VisibleForTesting
+ public OutputStream getStream() throws IOException {
+ initialize();
+ return rawWriter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/PhysicalWriter.java b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
new file mode 100644
index 0000000..83742e4
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/PhysicalWriter.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import java.io.IOException;
+import java.util.EnumSet;
+
+import org.apache.orc.OrcProto.BloomFilterIndex;
+import org.apache.orc.OrcProto.Footer;
+import org.apache.orc.OrcProto.Metadata;
+import org.apache.orc.OrcProto.PostScript;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.StripeFooter;
+import org.apache.orc.OrcProto.StripeInformation;
+
+public interface PhysicalWriter {
+
+ /**
+ * Creates all the streams/connections/etc. necessary to write.
+ */
+ void initialize() throws IOException;
+
+ /**
+ * Writes out the file metadata.
+ * @param builder Metadata builder to finalize and write.
+ */
+ void writeFileMetadata(Metadata.Builder builder) throws IOException;
+
+ /**
+ * Writes out the file footer.
+ * @param builder Footer builder to finalize and write.
+ */
+ void writeFileFooter(Footer.Builder builder) throws IOException;
+
+ /**
+ * Writes out the postscript (including the size byte if needed).
+ * @param builder Postscript builder to finalize and write.
+ */
+ void writePostScript(PostScript.Builder builder) throws IOException;
+
+ /**
+ * Creates physical stream to write data to.
+ * @param name Stream name.
+ * @return The output stream.
+ */
+ OutStream getOrCreatePhysicalStream(StreamName name) throws IOException;
+
+ /**
+ * Flushes the data in all the streams, spills them to disk, write out stripe footer.
+ * @param footer Stripe footer to be updated with relevant data and written out.
+ * @param dirEntry File metadata entry for the stripe, to be updated with relevant data.
+ */
+ void finalizeStripe(StripeFooter.Builder footer,
+ StripeInformation.Builder dirEntry) throws IOException;
+
+ /**
+ * Writes out the index for the stripe column.
+ * @param streamName Stream name.
+ * @param rowIndex Row index entries to write.
+ */
+ void writeIndexStream(StreamName name, RowIndex.Builder rowIndex) throws IOException;
+
+ /**
+ * Writes out the index for the stripe column.
+ * @param streamName Stream name.
+ * @param bloomFilterIndex Bloom filter index to write.
+ */
+ void writeBloomFilterStream(StreamName streamName,
+ BloomFilterIndex.Builder bloomFilterIndex) throws IOException;
+
+ /**
+ * Closes the writer.
+ */
+ void close() throws IOException;
+
+ /**
+ * Force-flushes the writer.
+ */
+ void flush() throws IOException;
+
+ /**
+ * @return the physical writer position (e.g. for updater).
+ */
+ long getRawWriterPosition() throws IOException;
+
+ /** @return physical stripe size, taking padding into account. */
+ long getPhysicalStripeSize();
+
+ /** @return whether the writer is compressed. */
+ boolean isCompressed();
+
+ /**
+ * Appends raw stripe data (e.g. for file merger).
+ * @param stripe Stripe data buffer.
+ * @param offset Stripe data buffer offset.
+ * @param length Stripe data buffer length.
+ * @param dirEntry File metadata entry for the stripe, to be updated with relevant data.
+ * @throws IOException
+ */
+ void appendRawStripe(byte[] stripe, int offset, int length,
+ StripeInformation.Builder dirEntry) throws IOException;
+
+ /**
+ * @return the estimated memory usage for the stripe.
+ */
+ long estimateMemory();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/ReaderImpl.java b/orc/src/java/org/apache/orc/impl/ReaderImpl.java
index 93fc0ce..70fa628 100644
--- a/orc/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/orc/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -350,7 +350,7 @@ public class ReaderImpl implements Reader {
if (fileMetadata != null) {
this.compressionKind = fileMetadata.getCompressionKind();
this.bufferSize = fileMetadata.getCompressionBufferSize();
- this.codec = WriterImpl.createCodec(compressionKind);
+ this.codec = PhysicalFsWriter.createCodec(compressionKind);
this.metadataSize = fileMetadata.getMetadataSize();
this.stripeStats = fileMetadata.getStripeStats();
this.versionList = fileMetadata.getVersionList();
@@ -459,7 +459,7 @@ public class ReaderImpl implements Reader {
System.arraycopy(buffer.array(), psOffset, psBuffer, 0, psLen);
OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(psBuffer);
int footerSize = (int) ps.getFooterLength();
- CompressionCodec codec = WriterImpl
+ CompressionCodec codec = PhysicalFsWriter
.createCodec(CompressionKind.valueOf(ps.getCompression().name()));
OrcProto.Footer footer = extractFooter(buffer,
(int) (buffer.position() + ps.getMetadataLength()),
@@ -509,7 +509,7 @@ public class ReaderImpl implements Reader {
int psOffset = readSize - 1 - psLen;
ps = extractPostScript(buffer, path, psLen, psOffset);
bufferSize = (int) ps.getCompressionBlockSize();
- codec = WriterImpl.createCodec(CompressionKind.valueOf(ps.getCompression().name()));
+ codec = PhysicalFsWriter.createCodec(CompressionKind.valueOf(ps.getCompression().name()));
fileTailBuilder.setPostscriptLength(psLen).setPostscript(ps);
int footerSize = (int) ps.getFooterLength();
http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java b/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
index 1067957..6100d50 100644
--- a/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -71,7 +71,7 @@ public class RecordReaderUtils {
this.fs = properties.getFileSystem();
this.path = properties.getPath();
this.useZeroCopy = properties.getZeroCopy();
- this.codec = WriterImpl.createCodec(properties.getCompression());
+ this.codec = PhysicalFsWriter.createCodec(properties.getCompression());
this.bufferSize = properties.getBufferSize();
this.typeCount = properties.getTypeCount();
if (useZeroCopy) {
http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
index b2966e0..b17fb41 100644
--- a/orc/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -21,12 +21,10 @@ package org.apache.orc.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
@@ -35,11 +33,12 @@ import java.util.TreeMap;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.orc.BinaryColumnStatistics;
import org.apache.orc.BloomFilterIO;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.CompressionKind;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.BloomFilterIndex;
+import org.apache.orc.OrcProto.RowIndex;
+import org.apache.orc.OrcProto.Stream;
import org.apache.orc.OrcUtils;
import org.apache.orc.StringColumnStatistics;
import org.apache.orc.StripeInformation;
@@ -48,7 +47,6 @@ import org.apache.orc.Writer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -69,7 +67,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
/**
* An ORC file writer. The file is divided into stripes, which is the natural
@@ -94,35 +91,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
- private static final int HDFS_BUFFER_SIZE = 256 * 1024;
private static final int MIN_ROW_INDEX_STRIDE = 1000;
- // threshold above which buffer size will be automatically resized
- private static final int COLUMN_COUNT_THRESHOLD = 1000;
-
- private final FileSystem fs;
private final Path path;
- private final long defaultStripeSize;
- private long adjustedStripeSize;
private final int rowIndexStride;
- private final CompressionKind compress;
- private final CompressionCodec codec;
- private final boolean addBlockPadding;
- private final int bufferSize;
- private final long blockSize;
- private final double paddingTolerance;
private final TypeDescription schema;
- // the streams that make up the current stripe
- private final Map<StreamName, BufferedStream> streams =
- new TreeMap<StreamName, BufferedStream>();
-
- private FSDataOutputStream rawWriter = null;
- // the compressed metadata information outStream
- private OutStream writer = null;
- // a protobuf outStream around streamFactory
- private CodedOutputStream protobufWriter = null;
- private long headerLength;
+ @VisibleForTesting
+ protected final PhysicalWriter physWriter;
private int columnCount;
private long rowCount = 0;
private long rowsInStripe = 0;
@@ -142,7 +118,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private final OrcFile.WriterCallback callback;
private final OrcFile.WriterContext callbackContext;
private final OrcFile.EncodingStrategy encodingStrategy;
- private final OrcFile.CompressionStrategy compressionStrategy;
private final boolean[] bloomFilterColumns;
private final double bloomFilterFpp;
private boolean writeTimeZone;
@@ -150,7 +125,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
public WriterImpl(FileSystem fs,
Path path,
OrcFile.WriterOptions opts) throws IOException {
- this.fs = fs;
this.path = path;
this.conf = opts.getConfiguration();
this.callback = opts.getCallback();
@@ -166,26 +140,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
} else {
callbackContext = null;
}
- this.adjustedStripeSize = opts.getStripeSize();
- this.defaultStripeSize = opts.getStripeSize();
this.version = opts.getVersion();
this.encodingStrategy = opts.getEncodingStrategy();
- this.compressionStrategy = opts.getCompressionStrategy();
- this.addBlockPadding = opts.getBlockPadding();
- this.blockSize = opts.getBlockSize();
- this.paddingTolerance = opts.getPaddingTolerance();
- this.compress = opts.getCompress();
this.rowIndexStride = opts.getRowIndexStride();
this.memoryManager = opts.getMemoryManager();
buildIndex = rowIndexStride > 0;
- codec = createCodec(compress);
- int numColumns = schema.getMaximumId() + 1;
- if (opts.isEnforceBufferSize()) {
- this.bufferSize = opts.getBufferSize();
- } else {
- this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
- numColumns, opts.getBufferSize());
- }
if (version == OrcFile.Version.V_0_11) {
/* do not write bloom filters for ORC v11 */
this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
@@ -194,6 +153,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
}
this.bloomFilterFpp = opts.getBloomFilterFpp();
+ int numColumns = schema.getMaximumId() + 1;
+ physWriter = new PhysicalFsWriter(fs, path, numColumns, opts);
treeWriter = createTreeWriter(schema, streamFactory, false);
if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
throw new IllegalArgumentException("Row stride must be at least " +
@@ -202,83 +163,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
// ensure that we are able to handle callbacks before we register ourselves
memoryManager.addWriter(path, opts.getStripeSize(), this);
- LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
- " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
- compress, bufferSize);
- }
-
- @VisibleForTesting
- public static int getEstimatedBufferSize(long stripeSize, int numColumns,
- int bs) {
- // The worst case is that there are 2 big streams per a column and
- // we want to guarantee that each stream gets ~10 buffers.
- // This keeps buffers small enough that we don't get really small stripe
- // sizes.
- int estBufferSize = (int) (stripeSize / (20 * numColumns));
- estBufferSize = getClosestBufferSize(estBufferSize);
- return estBufferSize > bs ? bs : estBufferSize;
- }
-
- private static int getClosestBufferSize(int estBufferSize) {
- final int kb4 = 4 * 1024;
- final int kb8 = 8 * 1024;
- final int kb16 = 16 * 1024;
- final int kb32 = 32 * 1024;
- final int kb64 = 64 * 1024;
- final int kb128 = 128 * 1024;
- final int kb256 = 256 * 1024;
- if (estBufferSize <= kb4) {
- return kb4;
- } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
- return kb8;
- } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
- return kb16;
- } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
- return kb32;
- } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
- return kb64;
- } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
- return kb128;
- } else {
- return kb256;
- }
- }
-
- public static CompressionCodec createCodec(CompressionKind kind) {
- switch (kind) {
- case NONE:
- return null;
- case ZLIB:
- return new ZlibCodec();
- case SNAPPY:
- return new SnappyCodec();
- case LZO:
- try {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- if (loader == null) {
- loader = WriterImpl.class.getClassLoader();
- }
- @SuppressWarnings("unchecked")
- Class<? extends CompressionCodec> lzo =
- (Class<? extends CompressionCodec>)
- loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
- return lzo.newInstance();
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("LZO is not available.", e);
- } catch (InstantiationException e) {
- throw new IllegalArgumentException("Problem initializing LZO", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException("Insufficient access to LZO", e);
- }
- default:
- throw new IllegalArgumentException("Unknown compression codec: " +
- kind);
- }
}
@Override
public boolean checkMemory(double newScale) throws IOException {
- long limit = (long) Math.round(adjustedStripeSize * newScale);
+ long limit = (long) Math.round(physWriter.getPhysicalStripeSize() * newScale);
long size = estimateStripeSize();
if (LOG.isDebugEnabled()) {
LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
@@ -291,116 +180,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
return false;
}
- /**
- * This class is used to hold the contents of streams as they are buffered.
- * The TreeWriters write to the outStream and the codec compresses the
- * data as buffers fill up and stores them in the output list. When the
- * stripe is being written, the whole stream is written to the file.
- */
- private class BufferedStream implements OutStream.OutputReceiver {
- private final OutStream outStream;
- private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
-
- BufferedStream(String name, int bufferSize,
- CompressionCodec codec) throws IOException {
- outStream = new OutStream(name, bufferSize, codec, this);
- }
-
- /**
- * Receive a buffer from the compression codec.
- * @param buffer the buffer to save
- */
- @Override
- public void output(ByteBuffer buffer) {
- output.add(buffer);
- }
-
- /**
- * Get the number of bytes in buffers that are allocated to this stream.
- * @return number of bytes in buffers
- */
- public long getBufferSize() {
- long result = 0;
- for(ByteBuffer buf: output) {
- result += buf.capacity();
- }
- return outStream.getBufferSize() + result;
- }
-
- /**
- * Flush the stream to the codec.
- * @throws IOException
- */
- public void flush() throws IOException {
- outStream.flush();
- }
-
- /**
- * Clear all of the buffers.
- * @throws IOException
- */
- public void clear() throws IOException {
- outStream.clear();
- output.clear();
- }
-
- /**
- * Check the state of suppress flag in output stream
- * @return value of suppress flag
- */
- public boolean isSuppressed() {
- return outStream.isSuppressed();
- }
-
- /**
- * Get the number of bytes that will be written to the output. Assumes
- * the stream has already been flushed.
- * @return the number of bytes
- */
- public long getOutputSize() {
- long result = 0;
- for(ByteBuffer buffer: output) {
- result += buffer.remaining();
- }
- return result;
- }
-
- /**
- * Write the saved compressed buffers to the OutputStream.
- * @param out the stream to write to
- * @throws IOException
- */
- void spillTo(OutputStream out) throws IOException {
- for(ByteBuffer buffer: output) {
- out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
- }
- }
-
- @Override
- public String toString() {
- return outStream.toString();
- }
- }
-
- /**
- * An output receiver that writes the ByteBuffers to the output stream
- * as they are received.
- */
- private class DirectStream implements OutStream.OutputReceiver {
- private final FSDataOutputStream output;
-
- DirectStream(FSDataOutputStream output) {
- this.output = output;
- }
-
- @Override
- public void output(ByteBuffer buffer) throws IOException {
- output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
- }
- }
-
private static class RowIndexPositionRecorder implements PositionRecorder {
private final OrcProto.RowIndexEntry.Builder builder;
@@ -430,44 +209,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
OrcProto.Stream.Kind kind
) throws IOException {
final StreamName name = new StreamName(column, kind);
- final EnumSet<CompressionCodec.Modifier> modifiers;
-
- switch (kind) {
- case BLOOM_FILTER:
- case DATA:
- case DICTIONARY_DATA:
- if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
- modifiers = EnumSet.of(CompressionCodec.Modifier.FAST,
- CompressionCodec.Modifier.TEXT);
- } else {
- modifiers = EnumSet.of(CompressionCodec.Modifier.DEFAULT,
- CompressionCodec.Modifier.TEXT);
- }
- break;
- case LENGTH:
- case DICTIONARY_COUNT:
- case PRESENT:
- case ROW_INDEX:
- case SECONDARY:
- // easily compressed using the fastest modes
- modifiers = EnumSet.of(CompressionCodec.Modifier.FASTEST,
- CompressionCodec.Modifier.BINARY);
- break;
- default:
- LOG.warn("Missing ORC compression modifiers for " + kind);
- modifiers = null;
- break;
- }
+ return physWriter.getOrCreatePhysicalStream(name);
+ }
- BufferedStream result = streams.get(name);
- if (result == null) {
- result = new BufferedStream(name.toString(), bufferSize,
- codec == null ? codec : codec.modify(modifiers));
- streams.put(name, result);
- }
- return result.outStream;
+ public void writeIndex(int column, RowIndex.Builder rowIndex) throws IOException {
+ physWriter.writeIndexStream(new StreamName(column, Stream.Kind.ROW_INDEX), rowIndex);
}
+ public void writeBloomFilter(
+ int column, BloomFilterIndex.Builder bloomFilterIndex) throws IOException {
+ physWriter.writeBloomFilterStream(
+ new StreamName(column, Stream.Kind.BLOOM_FILTER), bloomFilterIndex);
+ }
/**
* Get the next column id.
* @return a number from 0 to the number of columns - 1
@@ -496,7 +249,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
* @return are the streams compressed
*/
public boolean isCompressed() {
- return codec != null;
+ return physWriter.isCompressed();
}
/**
@@ -508,14 +261,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
/**
- * Get the compression strategy to use.
- * @return compression strategy
- */
- public OrcFile.CompressionStrategy getCompressionStrategy() {
- return compressionStrategy;
- }
-
- /**
* Get the bloom filter columns
* @return bloom filter columns
*/
@@ -572,8 +317,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
protected final RowIndexPositionRecorder rowIndexPosition;
private final OrcProto.RowIndex.Builder rowIndex;
private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
- private final PositionedOutputStream rowIndexStream;
- private final PositionedOutputStream bloomFilterStream;
protected final BloomFilterIO bloomFilter;
protected final boolean createBloomFilter;
private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
@@ -615,21 +358,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
stripeStatsBuilders = Lists.newArrayList();
- if (streamFactory.buildIndex()) {
- rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
- } else {
- rowIndexStream = null;
- }
if (createBloomFilter) {
bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
- bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
streamFactory.getBloomFilterFPP());
} else {
bloomFilterEntry = null;
bloomFilterIndex = null;
- bloomFilterStream = null;
bloomFilter = null;
}
}
@@ -758,11 +494,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
isPresent.flush();
// if no nulls are found in a stream, then suppress the stream
- if(!foundNulls) {
+ if (!foundNulls) {
isPresentOutStream.suppress();
// since isPresent bitstream is suppressed, update the index to
// remove the positions of the isPresent stream
- if (rowIndexStream != null) {
+ if (streamFactory.buildIndex()) {
removeIsPresentPositions();
}
}
@@ -781,22 +517,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
if (streamFactory.hasWriterTimeZone()) {
builder.setWriterTimezone(TimeZone.getDefault().getID());
}
- if (rowIndexStream != null) {
+ if (streamFactory.buildIndex()) {
if (rowIndex.getEntryCount() != requiredIndexEntries) {
throw new IllegalArgumentException("Column has wrong number of " +
"index entries found: " + rowIndex.getEntryCount() + " expected: " +
requiredIndexEntries);
}
- rowIndex.build().writeTo(rowIndexStream);
- rowIndexStream.flush();
+ streamFactory.writeIndex(id, rowIndex);
}
+
rowIndex.clear();
rowIndexEntry.clear();
// write the bloom filter to out stream
- if (bloomFilterStream != null) {
- bloomFilterIndex.build().writeTo(bloomFilterStream);
- bloomFilterStream.flush();
+ if (createBloomFilter) {
+ streamFactory.writeBloomFilter(id, bloomFilterIndex);
bloomFilterIndex.clear();
bloomFilterEntry.clear();
}
@@ -2463,17 +2198,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@VisibleForTesting
- public FSDataOutputStream getStream() throws IOException {
- if (rawWriter == null) {
- rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
- fs.getDefaultReplication(path), blockSize);
- rawWriter.writeBytes(OrcFile.MAGIC);
- headerLength = rawWriter.getPos();
- writer = new OutStream("metadata", bufferSize, codec,
- new DirectStream(rawWriter));
- protobufWriter = CodedOutputStream.newInstance(writer);
- }
- return rawWriter;
+ public void ensureStream() throws IOException {
+ physWriter.initialize();
}
private void createRowIndexEntry() throws IOException {
@@ -2482,7 +2208,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
private void flushStripe() throws IOException {
- getStream();
+ ensureStream();
if (buildIndex && rowsInIndex != 0) {
createRowIndexEntry();
}
@@ -2493,98 +2219,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
// finalize the data for the stripe
int requiredIndexEntries = rowIndexStride == 0 ? 0 :
(int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
- OrcProto.StripeFooter.Builder builder =
- OrcProto.StripeFooter.newBuilder();
+ OrcProto.StripeFooter.Builder builder = OrcProto.StripeFooter.newBuilder();
+ OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation
+ .newBuilder().setNumberOfRows(rowsInStripe);
treeWriter.writeStripe(builder, requiredIndexEntries);
- long indexSize = 0;
- long dataSize = 0;
- for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
- BufferedStream stream = pair.getValue();
- if (!stream.isSuppressed()) {
- stream.flush();
- StreamName name = pair.getKey();
- long streamSize = pair.getValue().getOutputSize();
- builder.addStreams(OrcProto.Stream.newBuilder()
- .setColumn(name.getColumn())
- .setKind(name.getKind())
- .setLength(streamSize));
- if (StreamName.Area.INDEX == name.getArea()) {
- indexSize += streamSize;
- } else {
- dataSize += streamSize;
- }
- }
- }
- OrcProto.StripeFooter footer = builder.build();
-
- // Do we need to pad the file so the stripe doesn't straddle a block
- // boundary?
- long start = rawWriter.getPos();
- final long currentStripeSize = indexSize + dataSize + footer.getSerializedSize();
- final long available = blockSize - (start % blockSize);
- final long overflow = currentStripeSize - adjustedStripeSize;
- final float availRatio = (float) available / (float) defaultStripeSize;
-
- if (availRatio > 0.0f && availRatio < 1.0f
- && availRatio > paddingTolerance) {
- // adjust default stripe size to fit into remaining space, also adjust
- // the next stripe for correction based on the current stripe size
- // and user specified padding tolerance. Since stripe size can overflow
- // the default stripe size we should apply this correction to avoid
- // writing portion of last stripe to next hdfs block.
- double correction = overflow > 0 ? (double) overflow
- / (double) adjustedStripeSize : 0.0;
-
- // correction should not be greater than user specified padding
- // tolerance
- correction = correction > paddingTolerance ? paddingTolerance
- : correction;
-
- // adjust next stripe size based on current stripe estimate correction
- adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize));
- } else if (availRatio >= 1.0) {
- adjustedStripeSize = defaultStripeSize;
- }
-
- if (availRatio < paddingTolerance && addBlockPadding) {
- long padding = blockSize - (start % blockSize);
- byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
- LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)",
- padding, availRatio, defaultStripeSize));
- start += padding;
- while (padding > 0) {
- int writeLen = (int) Math.min(padding, pad.length);
- rawWriter.write(pad, 0, writeLen);
- padding -= writeLen;
- }
- adjustedStripeSize = defaultStripeSize;
- } else if (currentStripeSize < blockSize
- && (start % blockSize) + currentStripeSize > blockSize) {
- // even if you don't pad, reset the default stripe size when crossing a
- // block boundary
- adjustedStripeSize = defaultStripeSize;
- }
-
- // write out the data streams
- for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
- BufferedStream stream = pair.getValue();
- if (!stream.isSuppressed()) {
- stream.spillTo(rawWriter);
- }
- stream.clear();
- }
- footer.writeTo(protobufWriter);
- protobufWriter.flush();
- writer.flush();
- long footerLength = rawWriter.getPos() - start - dataSize - indexSize;
- OrcProto.StripeInformation dirEntry =
- OrcProto.StripeInformation.newBuilder()
- .setOffset(start)
- .setNumberOfRows(rowsInStripe)
- .setIndexLength(indexSize)
- .setDataLength(dataSize)
- .setFooterLength(footerLength).build();
- stripes.add(dirEntry);
+ physWriter.finalizeStripe(builder, dirEntry);
+ stripes.add(dirEntry.build());
rowCount += rowsInStripe;
rowsInStripe = 0;
}
@@ -2645,17 +2285,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
return total;
}
- private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
- switch (kind) {
- case NONE: return OrcProto.CompressionKind.NONE;
- case ZLIB: return OrcProto.CompressionKind.ZLIB;
- case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
- case LZO: return OrcProto.CompressionKind.LZO;
- default:
- throw new IllegalArgumentException("Unknown compression " + kind);
- }
- }
-
private void writeFileStatistics(OrcProto.Footer.Builder builder,
TreeWriter writer) throws IOException {
builder.addStatistics(writer.fileStatistics.serialize());
@@ -2664,26 +2293,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
- private int writeMetadata() throws IOException {
- getStream();
+ private void writeMetadata() throws IOException {
+ ensureStream();
OrcProto.Metadata.Builder builder = OrcProto.Metadata.newBuilder();
for(OrcProto.StripeStatistics.Builder ssb : treeWriter.stripeStatsBuilders) {
builder.addStripeStats(ssb.build());
}
- long startPosn = rawWriter.getPos();
- OrcProto.Metadata metadata = builder.build();
- metadata.writeTo(protobufWriter);
- protobufWriter.flush();
- writer.flush();
- return (int) (rawWriter.getPos() - startPosn);
+ physWriter.writeFileMetadata(builder);
}
- private int writeFooter(long bodyLength) throws IOException {
- getStream();
+ private void writeFooter() throws IOException {
+ ensureStream();
OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
- builder.setContentLength(bodyLength);
- builder.setHeaderLength(headerLength);
builder.setNumberOfRows(rowCount);
builder.setRowIndexStride(rowIndexStride);
// populate raw data size
@@ -2701,45 +2323,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
builder.addMetadata(OrcProto.UserMetadataItem.newBuilder()
.setName(entry.getKey()).setValue(entry.getValue()));
}
- long startPosn = rawWriter.getPos();
- OrcProto.Footer footer = builder.build();
- footer.writeTo(protobufWriter);
- protobufWriter.flush();
- writer.flush();
- return (int) (rawWriter.getPos() - startPosn);
+ physWriter.writeFileFooter(builder);
}
- private int writePostScript(int footerLength, int metadataLength) throws IOException {
+ private void writePostScript() throws IOException {
OrcProto.PostScript.Builder builder =
OrcProto.PostScript.newBuilder()
- .setCompression(writeCompressionKind(compress))
- .setFooterLength(footerLength)
- .setMetadataLength(metadataLength)
.setMagic(OrcFile.MAGIC)
.addVersion(version.getMajor())
.addVersion(version.getMinor())
.setWriterVersion(OrcFile.CURRENT_WRITER.getId());
- if (compress != CompressionKind.NONE) {
- builder.setCompressionBlockSize(bufferSize);
- }
- OrcProto.PostScript ps = builder.build();
- // need to write this uncompressed
- long startPosn = rawWriter.getPos();
- ps.writeTo(rawWriter);
- long length = rawWriter.getPos() - startPosn;
- if (length > 255) {
- throw new IllegalArgumentException("PostScript too large at " + length);
- }
- return (int) length;
+ physWriter.writePostScript(builder);
}
private long estimateStripeSize() {
- long result = 0;
- for(BufferedStream stream: streams.values()) {
- result += stream.getBufferSize();
- }
- result += treeWriter.estimateMemory();
- return result;
+ return physWriter.estimateMemory() + treeWriter.estimateMemory();
}
@Override
@@ -2785,11 +2383,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
memoryManager.removeWriter(path);
// actually close the file
flushStripe();
- int metadataLength = writeMetadata();
- int footerLength = writeFooter(rawWriter.getPos() - metadataLength);
- rawWriter.writeByte(writePostScript(footerLength, metadataLength));
- rawWriter.close();
-
+ writeMetadata();
+ writeFooter();
+ writePostScript();
+ physWriter.close();
}
/**
@@ -2819,13 +2416,13 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
if (callback != null) {
callback.preFooterWrite(callbackContext);
}
- int metaLength = writeMetadata();
- int footLength = writeFooter(rawWriter.getPos() - metaLength);
- rawWriter.writeByte(writePostScript(footLength, metaLength));
+ writeMetadata();
+ writeFooter();
+ writePostScript();
stripesAtLastFlush = stripes.size();
- rawWriter.hflush();
+ physWriter.flush();
}
- return rawWriter.getPos();
+ return physWriter.getRawWriterPosition();
}
@Override
@@ -2839,26 +2436,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
checkArgument(stripeStatistics != null,
"Stripe statistics must not be null");
- getStream();
- long start = rawWriter.getPos();
- long availBlockSpace = blockSize - (start % blockSize);
-
- // see if stripe can fit in the current hdfs block, else pad the remaining
- // space in the block
- if (length < blockSize && length > availBlockSpace &&
- addBlockPadding) {
- byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)];
- LOG.info(String.format("Padding ORC by %d bytes while merging..",
- availBlockSpace));
- start += availBlockSpace;
- while (availBlockSpace > 0) {
- int writeLen = (int) Math.min(availBlockSpace, pad.length);
- rawWriter.write(pad, 0, writeLen);
- availBlockSpace -= writeLen;
- }
- }
+ ensureStream();
+ OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation.newBuilder();
+ physWriter.appendRawStripe(stripe, offset, length, dirEntry);
- rawWriter.write(stripe);
rowsInStripe = stripeStatistics.getColStats(0).getNumberOfValues();
rowCount += rowsInStripe;
@@ -2869,15 +2450,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
updateFileStatistics(stripeStatistics);
// update stripe information
- OrcProto.StripeInformation dirEntry = OrcProto.StripeInformation
- .newBuilder()
- .setOffset(start)
- .setNumberOfRows(rowsInStripe)
+ stripes.add(dirEntry.setNumberOfRows(rowsInStripe)
.setIndexLength(stripeInfo.getIndexLength())
.setDataLength(stripeInfo.getDataLength())
.setFooterLength(stripeInfo.getFooterLength())
- .build();
- stripes.add(dirEntry);
+ .build());
// reset it after writing the stripe
rowsInStripe = 0;
http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java
----------------------------------------------------------------------
diff --git a/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java b/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java
index 289a86e..efa3ffb 100644
--- a/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java
+++ b/orc/src/test/org/apache/orc/impl/TestOrcWideTable.java
@@ -28,37 +28,37 @@ public class TestOrcWideTable {
@Test
public void testBufferSizeFor1Col() throws IOException {
- assertEquals(128 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024,
+ assertEquals(128 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024,
1, 128*1024));
}
@Test
public void testBufferSizeFor50Col() throws IOException {
- assertEquals(256 * 1024, WriterImpl.getEstimatedBufferSize(256 * 1024 * 1024,
+ assertEquals(256 * 1024, PhysicalFsWriter.getEstimatedBufferSize(256 * 1024 * 1024,
50, 256*1024));
}
@Test
public void testBufferSizeFor1000Col() throws IOException {
- assertEquals(32 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024,
+ assertEquals(32 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024,
1000, 128*1024));
}
@Test
public void testBufferSizeFor2000Col() throws IOException {
- assertEquals(16 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024,
+ assertEquals(16 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024,
2000, 256*1024));
}
@Test
public void testBufferSizeFor4000Col() throws IOException {
- assertEquals(8 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024,
+ assertEquals(8 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024,
4000, 256*1024));
}
@Test
public void testBufferSizeFor25000Col() throws IOException {
- assertEquals(4 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024,
+ assertEquals(4 * 1024, PhysicalFsWriter.getEstimatedBufferSize(512 * 1024 * 1024,
25000, 256*1024));
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 8e52907..075c3b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -60,6 +60,9 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspe
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
+import org.apache.orc.impl.PhysicalWriter;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* An ORC file writer. The file is divided into stripes, which is the natural
@@ -312,4 +315,9 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer
flushInternalBatch();
super.close();
}
+
+ @VisibleForTesting
+ PhysicalWriter getPhysicalWriter() {
+ return physWriter;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/65d8fae0/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index d6b48a3..197c1d2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -122,6 +122,7 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.orc.*;
+import org.apache.orc.impl.PhysicalFsWriter;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -2138,8 +2139,7 @@ public class TestInputOutputFormat {
writer.addRow(new MyRow(i, 2*i));
}
writer.close();
- ((MockOutputStream) ((WriterImpl) writer).getStream())
- .setBlocks(new MockBlock("host0", "host1"));
+ getStreamFromWriter(writer).setBlocks(new MockBlock("host0", "host1"));
// call getsplits
HiveInputFormat<?,?> inputFormat =
@@ -2160,6 +2160,11 @@ public class TestInputOutputFormat {
assertEquals(false, reader.next(key, value));
}
+ private MockOutputStream getStreamFromWriter(Writer writer) throws IOException {
+ PhysicalFsWriter pfr = (PhysicalFsWriter)((WriterImpl) writer).getPhysicalWriter();
+ return (MockOutputStream)pfr.getStream();
+ }
+
/**
* Test vectorization, non-acid, non-combine.
* @throws Exception
@@ -2185,8 +2190,7 @@ public class TestInputOutputFormat {
writer.addRow(new MyRow(i, 2*i));
}
writer.close();
- ((MockOutputStream) ((WriterImpl) writer).getStream())
- .setBlocks(new MockBlock("host0", "host1"));
+ getStreamFromWriter(writer).setBlocks(new MockBlock("host0", "host1"));
// call getsplits
conf.setInt(hive_metastoreConstants.BUCKET_COUNT, 3);
@@ -2226,8 +2230,7 @@ public class TestInputOutputFormat {
}
WriterImpl baseWriter = (WriterImpl) writer.getWriter();
writer.close(false);
- ((MockOutputStream) baseWriter.getStream())
- .setBlocks(new MockBlock("host0", "host1"));
+ getStreamFromWriter(baseWriter).setBlocks(new MockBlock("host0", "host1"));
// call getsplits
HiveInputFormat<?, ?> inputFormat =
@@ -2305,7 +2308,7 @@ public class TestInputOutputFormat {
writer.addRow(new MyRow(i, 2*i));
}
writer.close();
- MockOutputStream outputStream = (MockOutputStream) ((WriterImpl) writer).getStream();
+ MockOutputStream outputStream = getStreamFromWriter(writer);
outputStream.setBlocks(new MockBlock("host0", "host1"));
int length0 = outputStream.file.length;
writer =
@@ -2316,7 +2319,7 @@ public class TestInputOutputFormat {
writer.addRow(new MyRow(i, 2*i));
}
writer.close();
- outputStream = (MockOutputStream) ((WriterImpl) writer).getStream();
+ outputStream = getStreamFromWriter(writer);
outputStream.setBlocks(new MockBlock("host1", "host2"));
// call getsplits
@@ -2383,7 +2386,7 @@ public class TestInputOutputFormat {
WriterImpl baseWriter = (WriterImpl) writer.getWriter();
writer.close(false);
- MockOutputStream outputStream = (MockOutputStream) baseWriter.getStream();
+ MockOutputStream outputStream = getStreamFromWriter(baseWriter);
outputStream.setBlocks(new MockBlock("host1", "host2"));
// write a delta file in partition 0
@@ -2394,7 +2397,7 @@ public class TestInputOutputFormat {
writer.insert(10, new MyRow(i, 2*i));
}
WriterImpl deltaWriter = (WriterImpl) writer.getWriter();
- outputStream = (MockOutputStream) deltaWriter.getStream();
+ outputStream = getStreamFromWriter(deltaWriter);
writer.close(false);
outputStream.setBlocks(new MockBlock("host1", "host2"));
@@ -2407,7 +2410,7 @@ public class TestInputOutputFormat {
.bufferSize(1024)
.inspector(inspector));
orc.addRow(new MyRow(1, 2));
- outputStream = (MockOutputStream) ((WriterImpl) orc).getStream();
+ outputStream = getStreamFromWriter(orc);
orc.close();
outputStream.setBlocks(new MockBlock("host3", "host4"));
}
[2/2] hive git commit: HIVE-15381 : don't log the callstack for
reduce.xml-doesn't-exist (Sergey Shelukhin, reviewed by Ashutosh Chauhan)
Posted by se...@apache.org.
HIVE-15381 : don't log the callstack for reduce.xml-doesn't-exist (Sergey Shelukhin, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/777477f2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/777477f2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/777477f2
Branch: refs/heads/master
Commit: 777477f25060f35631a20b7d2baef1df6da7c3c0
Parents: 65d8fae
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Dec 8 11:06:04 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Dec 8 11:06:04 2016 -0800
----------------------------------------------------------------------
ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/777477f2/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index e8f50f2..98c6c76 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -461,7 +461,7 @@ public final class Utilities {
return gWork;
} catch (FileNotFoundException fnf) {
// happens. e.g.: no reduce work.
- LOG.debug("No plan file found: " + path, fnf);
+ LOG.debug("No plan file found: " + path + "; " + fnf.getMessage());
return null;
} catch (Exception e) {
String msg = "Failed to load plan: " + path;