You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ke...@apache.org on 2013/03/05 21:44:52 UTC
svn commit: r1452992 [6/8] - in /hive/trunk: ./ ivy/ ql/
ql/src/gen/protobuf/ ql/src/gen/protobuf/gen-java/
ql/src/gen/protobuf/gen-java/org/ ql/src/gen/protobuf/gen-java/org/apache/
ql/src/gen/protobuf/gen-java/org/apache/hadoop/ ql/src/gen/protobuf/g...
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,1415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * An ORC file writer. The file is divided into stripes, which is the natural
+ * unit of work when reading. Each stripe is buffered in memory until the
+ * memory reaches the stripe size and then it is written out broken down by
+ * columns. Each column is written by a TreeWriter that is specific to that
+ * type of column. TreeWriters may have children TreeWriters that handle the
+ * sub-types. Each of the TreeWriters writes the column's data as a set of
+ * streams.
+ */
+class WriterImpl implements Writer {
+
+ private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+ private static final int MIN_ROW_INDEX_STRIDE = 1000;
+
+ private final FileSystem fs;
+ private final Path path;
+ private final long stripeSize;
+ private final int rowIndexStride;
+ private final CompressionKind compress;
+ private final CompressionCodec codec;
+ private final int bufferSize;
+ // the streams that make up the current stripe
+ private final Map<StreamName, BufferedStream> streams =
+ new TreeMap<StreamName, BufferedStream>();
+
+ private FSDataOutputStream rawWriter = null;
+ // the compressed metadata information outStream
+ private OutStream writer = null;
+ // a protobuf outStream around streamFactory
+ private CodedOutputStream protobufWriter = null;
+ private long headerLength;
+ private int columnCount;
+ private long rowCount = 0;
+ private long rowsInStripe = 0;
+ private int rowsInIndex = 0;
+ private final List<OrcProto.StripeInformation> stripes =
+ new ArrayList<OrcProto.StripeInformation>();
+ private final Map<String, ByteString> userMetadata =
+ new TreeMap<String, ByteString>();
+ private final StreamFactory streamFactory = new StreamFactory();
+ private final TreeWriter treeWriter;
+ private final OrcProto.RowIndex.Builder rowIndex =
+ OrcProto.RowIndex.newBuilder();
+ private final boolean buildIndex;
+
+ WriterImpl(FileSystem fs,
+ Path path,
+ ObjectInspector inspector,
+ long stripeSize,
+ CompressionKind compress,
+ int bufferSize,
+ int rowIndexStride) throws IOException {
+ this.fs = fs;
+ this.path = path;
+ this.stripeSize = stripeSize;
+ this.compress = compress;
+ this.bufferSize = bufferSize;
+ this.rowIndexStride = rowIndexStride;
+ buildIndex = rowIndexStride > 0;
+ codec = createCodec(compress);
+ treeWriter = createTreeWriter(inspector, streamFactory, false);
+ if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
+ throw new IllegalArgumentException("Row stride must be at least " +
+ MIN_ROW_INDEX_STRIDE);
+ }
+ }
+
+ static CompressionCodec createCodec(CompressionKind kind) {
+ switch (kind) {
+ case NONE:
+ return null;
+ case ZLIB:
+ return new ZlibCodec();
+ case SNAPPY:
+ return new SnappyCodec();
+ case LZO:
+ try {
+ Class<? extends CompressionCodec> lzo =
+ (Class<? extends CompressionCodec>)
+ Class.forName("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+ return lzo.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("LZO is not available.", e);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Problem initializing LZO", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Insufficient access to LZO", e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression codec: " +
+ kind);
+ }
+ }
+
+ /**
+ * This class is used to hold the contents of streams as they are buffered.
+ * The TreeWriters write to the outStream and the codec compresses the
+ * data as buffers fill up and stores them in the output list. When the
+ * stripe is being written, the whole stream is written to the file.
+ */
+ private class BufferedStream implements OutStream.OutputReceiver {
+ private final OutStream outStream;
+ private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
+
+ BufferedStream(String name, int bufferSize,
+ CompressionCodec codec) throws IOException {
+ outStream = new OutStream(name, bufferSize, codec, this);
+ }
+
+ /**
+ * Receive a buffer from the compression codec.
+ * @param buffer the buffer to save
+ * @throws IOException
+ */
+ @Override
+ public void output(ByteBuffer buffer) {
+ output.add(buffer);
+ }
+
+ /**
+ * Flush the stream to the codec.
+ * @throws IOException
+ */
+ public void flush() throws IOException {
+ outStream.flush();
+ }
+
+ /**
+ * Clear all of the buffers.
+ * @throws IOException
+ */
+ public void clear() throws IOException {
+ outStream.clear();
+ output.clear();
+ }
+
+ /**
+ * Write the saved compressed buffers to the OutputStream.
+ * @param out the stream to write to
+ * @throws IOException
+ */
+ void spillTo(OutputStream out) throws IOException {
+ for(ByteBuffer buffer: output) {
+ out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ /**
+ * Get the size of compressed and uncompressed data in the stream's buffers.
+ * @return the number of bytes in the buffers.
+ */
+ long getSize() {
+ return outStream.getSize();
+ }
+ }
+
+ /**
+ * An output receiver that writes the ByteBuffers to the output stream
+ * as they are received.
+ */
+ private class DirectStream implements OutStream.OutputReceiver {
+ private final FSDataOutputStream output;
+
+ DirectStream(FSDataOutputStream output) {
+ this.output = output;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ private static class RowIndexPositionRecorder implements PositionRecorder {
+ private final OrcProto.RowIndexEntry.Builder builder;
+
+ RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public void addPosition(long position) {
+ builder.addPositions(position);
+ }
+ }
+
+ /**
+ * Interface from the Writer to the TreeWriters. This limits the visibility
+ * that the TreeWriters have into the Writer.
+ */
+ private class StreamFactory {
+ /**
+ * Create a stream to store part of a column.
+ * @param column the column id for the stream
+ * @param kind the kind of stream
+ * @return The output outStream that the section needs to be written to.
+ * @throws IOException
+ */
+ public PositionedOutputStream createStream(int column,
+ OrcProto.Stream.Kind kind
+ ) throws IOException {
+ StreamName name = new StreamName(column, kind);
+ BufferedStream result = streams.get(name);
+ if (result == null) {
+ result = new BufferedStream(name.toString(), bufferSize, codec);
+ streams.put(name, result);
+ }
+ return result.outStream;
+ }
+
+ /**
+ * Get the next column id.
+ * @return a number from 0 to the number of columns - 1
+ */
+ public int getNextColumnId() {
+ return columnCount++;
+ }
+
+ /**
+ * Get the stride rate of the row index.
+ */
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ /**
+ * Should be building the row index.
+ * @return true if we are building the index
+ */
+ public boolean buildIndex() {
+ return buildIndex;
+ }
+ }
+
+ /**
+ * The parent class of all of the writers for each column. Each column
+ * is written by an instance of this class. The compound types (struct,
+ * list, map, and union) have children tree writers that write the children
+ * types.
+ */
+ private abstract static class TreeWriter {
+ protected final int id;
+ protected final ObjectInspector inspector;
+ private final BitFieldWriter isPresent;
+ protected final ColumnStatisticsImpl indexStatistics;
+ private final ColumnStatisticsImpl fileStatistics;
+ protected TreeWriter[] childrenWriters;
+ protected final RowIndexPositionRecorder rowIndexPosition;
+ private final OrcProto.RowIndex.Builder rowIndex;
+ private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
+ private final PositionedOutputStream rowIndexStream;
+
+ /**
+ * Create a tree writer
+ * @param columnId the column id of the column to write
+ * @param inspector the object inspector to use
+ * @param streamFactory limited access to the Writer's data.
+ * @param nullable can the value be null?
+ * @throws IOException
+ */
+ TreeWriter(int columnId, ObjectInspector inspector,
+ StreamFactory streamFactory,
+ boolean nullable) throws IOException {
+ this.id = columnId;
+ this.inspector = inspector;
+ if (nullable) {
+ isPresent = new BitFieldWriter(streamFactory.createStream(id,
+ OrcProto.Stream.Kind.PRESENT), 1);
+ } else {
+ isPresent = null;
+ }
+ indexStatistics = ColumnStatisticsImpl.create(inspector);
+ fileStatistics = ColumnStatisticsImpl.create(inspector);
+ childrenWriters = new TreeWriter[0];
+ rowIndex = OrcProto.RowIndex.newBuilder();
+ rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+ rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
+ if (streamFactory.buildIndex()) {
+ rowIndexStream = streamFactory.createStream(id,
+ OrcProto.Stream.Kind.ROW_INDEX);
+ } else {
+ rowIndexStream = null;
+ }
+ }
+
+ protected OrcProto.RowIndex.Builder getRowIndex() {
+ return rowIndex;
+ }
+
+ protected ColumnStatisticsImpl getFileStatistics() {
+ return fileStatistics;
+ }
+
+ protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
+ return rowIndexEntry;
+ }
+
+ /**
+ * Add a new value to the column.
+ * @param obj
+ * @throws IOException
+ */
+ void write(Object obj) throws IOException {
+ if (obj != null) {
+ indexStatistics.increment();
+ }
+ if (isPresent != null) {
+ isPresent.write(obj == null ? 0 : 1);
+ }
+ }
+
+ /**
+ * Write the stripe out to the file.
+ * @param builder the stripe footer that contains the information about the
+ * layout of the stripe. The TreeWriter is required to update
+ * the footer with its information.
+ * @param requiredIndexEntries the number of index entries that are
+ * required. this is to check to make sure the
+ * row index is well formed.
+ * @throws IOException
+ */
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ if (isPresent != null) {
+ isPresent.flush();
+ }
+ builder.addColumns(getEncoding());
+ if (rowIndexStream != null) {
+ if (rowIndex.getEntryCount() != requiredIndexEntries) {
+ throw new IllegalArgumentException("Column has wrong number of " +
+ "index entries found: " + rowIndexEntry + " expected: " +
+ requiredIndexEntries);
+ }
+ rowIndex.build().writeTo(rowIndexStream);
+ rowIndexStream.flush();
+ }
+ rowIndex.clear();
+ rowIndexEntry.clear();
+ }
+
+ TreeWriter[] getChildrenWriters() {
+ return childrenWriters;
+ }
+
+ /**
+ * Get the encoding for this column.
+ * @return the information about the encoding of this column
+ */
+ OrcProto.ColumnEncoding getEncoding() {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ /**
+ * Create a row index entry with the previous location and the current
+ * index statistics. Also merges the index statistics into the file
+ * statistics before they are cleared. Finally, it records the start of the
+ * next index and ensures all of the children columns also create an entry.
+ * @throws IOException
+ */
+ void createRowIndexEntry() throws IOException {
+ fileStatistics.merge(indexStatistics);
+ rowIndexEntry.setStatistics(indexStatistics.serialize());
+ indexStatistics.reset();
+ rowIndex.addEntry(rowIndexEntry);
+ rowIndexEntry.clear();
+ recordPosition(rowIndexPosition);
+ for(TreeWriter child: childrenWriters) {
+ child.createRowIndexEntry();
+ }
+ }
+
+ /**
+ * Record the current position in each of this column's streams.
+ * @param recorder where should the locations be recorded
+ * @throws IOException
+ */
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ if (isPresent != null) {
+ isPresent.getPosition(recorder);
+ }
+ }
+
+ /**
+ * Estimate how much memory the writer is consuming excluding the streams.
+ * @return the number of bytes.
+ */
+ long estimateMemory() {
+ long result = 0;
+ for (TreeWriter child: childrenWriters) {
+ result += child.estimateMemory();
+ }
+ return result;
+ }
+ }
+
+ private static class BooleanTreeWriter extends TreeWriter {
+ private final BitFieldWriter writer;
+
+ BooleanTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ PositionedOutputStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.writer = new BitFieldWriter(out, 1);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ boolean val = ((BooleanObjectInspector) inspector).get(obj);
+ indexStatistics.updateBoolean(val);
+ writer.write(val ? 1 : 0);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class ByteTreeWriter extends TreeWriter {
+ private final RunLengthByteWriter writer;
+
+ ByteTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ this.writer = new RunLengthByteWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA));
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ byte val = ((ByteObjectInspector) inspector).get(obj);
+ indexStatistics.updateInteger(val);
+ writer.write(val);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class IntegerTreeWriter extends TreeWriter {
+ private final RunLengthIntegerWriter writer;
+ private final ShortObjectInspector shortInspector;
+ private final IntObjectInspector intInspector;
+ private final LongObjectInspector longInspector;
+
+ IntegerTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ PositionedOutputStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.writer = new RunLengthIntegerWriter(out, true);
+ if (inspector instanceof IntObjectInspector) {
+ intInspector = (IntObjectInspector) inspector;
+ shortInspector = null;
+ longInspector = null;
+ } else {
+ intInspector = null;
+ if (inspector instanceof LongObjectInspector) {
+ longInspector = (LongObjectInspector) inspector;
+ shortInspector = null;
+ } else {
+ shortInspector = (ShortObjectInspector) inspector;
+ longInspector = null;
+ }
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ long val;
+ if (intInspector != null) {
+ val = intInspector.get(obj);
+ } else if (longInspector != null) {
+ val = longInspector.get(obj);
+ } else {
+ val = shortInspector.get(obj);
+ }
+ indexStatistics.updateInteger(val);
+ writer.write(val);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class FloatTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+
+ FloatTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ float val = ((FloatObjectInspector) inspector).get(obj);
+ indexStatistics.updateDouble(val);
+ SerializationUtils.writeFloat(stream, val);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ }
+ }
+
+ private static class DoubleTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+
+ DoubleTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ double val = ((DoubleObjectInspector) inspector).get(obj);
+ indexStatistics.updateDouble(val);
+ SerializationUtils.writeDouble(stream, val);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ }
+ }
+
+ private static class StringTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stringOutput;
+ private final RunLengthIntegerWriter lengthOutput;
+ private final RunLengthIntegerWriter rowOutput;
+ private final RunLengthIntegerWriter countOutput;
+ private final StringRedBlackTree dictionary = new StringRedBlackTree();
+ private final DynamicIntArray rows = new DynamicIntArray();
+ private final List<OrcProto.RowIndexEntry> savedRowIndex =
+ new ArrayList<OrcProto.RowIndexEntry>();
+ private final boolean buildIndex;
+ private final List<Long> rowIndexValueCount = new ArrayList<Long>();
+
+ StringTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ stringOutput = writer.createStream(id,
+ OrcProto.Stream.Kind.DICTIONARY_DATA);
+ lengthOutput = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ rowOutput = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false);
+ if (writer.buildIndex()) {
+ countOutput = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DICTIONARY_COUNT), false);
+ } else {
+ countOutput = null;
+ }
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(0L);
+ buildIndex = writer.buildIndex();
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ String val = ((StringObjectInspector) inspector)
+ .getPrimitiveJavaObject(obj);
+ rows.add(dictionary.add(val));
+ indexStatistics.updateString(val);
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ // Traverse the red-black tree writing out the bytes and lengths; and
+ // creating the map from the original order to the final sorted order.
+ final int[] dumpOrder = new int[dictionary.size()];
+ dictionary.visit(new StringRedBlackTree.Visitor() {
+ private int currentId = 0;
+ @Override
+ public void visit(StringRedBlackTree.VisitorContext context
+ ) throws IOException {
+ context.writeBytes(stringOutput);
+ lengthOutput.write(context.getLength());
+ dumpOrder[context.getOriginalPosition()] = currentId++;
+ if (countOutput != null) {
+ countOutput.write(context.getCount());
+ }
+ }
+ });
+ int length = rows.size();
+ int rowIndexEntry = 0;
+ OrcProto.RowIndex.Builder rowIndex = getRowIndex();
+ // need to build the first index entry out here, to handle the case of
+ // not having any values.
+ if (buildIndex) {
+ while (0 == rowIndexValueCount.get(rowIndexEntry) &&
+ rowIndexEntry < savedRowIndex.size()) {
+ OrcProto.RowIndexEntry.Builder base =
+ savedRowIndex.get(rowIndexEntry++).toBuilder();
+ rowOutput.getPosition(new RowIndexPositionRecorder(base));
+ rowIndex.addEntry(base.build());
+ }
+ }
+ // write the values translated into the dump order.
+ for(int i = 0; i < length; ++i) {
+ // now that we are writing out the row values, we can finalize the
+ // row index
+ if (buildIndex) {
+ while (i == rowIndexValueCount.get(rowIndexEntry) &&
+ rowIndexEntry < savedRowIndex.size()) {
+ OrcProto.RowIndexEntry.Builder base =
+ savedRowIndex.get(rowIndexEntry++).toBuilder();
+ rowOutput.getPosition(new RowIndexPositionRecorder(base));
+ rowIndex.addEntry(base.build());
+ }
+ }
+ rowOutput.write(dumpOrder[rows.get(i)]);
+ }
+ // we need to build the rowindex before calling super, since it
+ // writes it out.
+ super.writeStripe(builder, requiredIndexEntries);
+ stringOutput.flush();
+ lengthOutput.flush();
+ rowOutput.flush();
+ if (countOutput != null) {
+ countOutput.flush();
+ }
+ // reset all of the fields to be ready for the next stripe.
+ dictionary.clear();
+ rows.clear();
+ savedRowIndex.clear();
+ rowIndexValueCount.clear();
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(0L);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DICTIONARY).
+ setDictionarySize(dictionary.size()).build();
+ }
+
+ /**
+ * This method doesn't call the super method, because unlike most of the
+ * other TreeWriters, this one can't record the position in the streams
+ * until the stripe is being flushed. Therefore it saves all of the entries
+ * and augments them with the final information as the stripe is written.
+ * @throws IOException
+ */
+ void createRowIndexEntry() throws IOException {
+ getFileStatistics().merge(indexStatistics);
+ OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
+ rowIndexEntry.setStatistics(indexStatistics.serialize());
+ indexStatistics.reset();
+ savedRowIndex.add(rowIndexEntry.build());
+ rowIndexEntry.clear();
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(Long.valueOf(rows.size()));
+ }
+
+ @Override
+ long estimateMemory() {
+ return rows.size() * 4 + dictionary.getByteSize();
+ }
+ }
+
+ private static class BinaryTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+ private final RunLengthIntegerWriter length;
+
+ BinaryTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.length = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ BytesWritable val =
+ ((BinaryObjectInspector) inspector).getPrimitiveWritableObject(obj);
+ stream.write(val.getBytes(), 0, val.getLength());
+ length.write(val.getLength());
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ length.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ length.getPosition(recorder);
+ }
+ }
+
+ static final int MILLIS_PER_SECOND = 1000;
+ static final long BASE_TIMESTAMP =
+ Timestamp.valueOf("2015-01-01 00:00:00").getTime() / MILLIS_PER_SECOND;
+
+ private static class TimestampTreeWriter extends TreeWriter {
+ private final RunLengthIntegerWriter seconds;
+ private final RunLengthIntegerWriter nanos;
+
+ TimestampTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ this.seconds = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true);
+ this.nanos = new RunLengthIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.NANO_DATA), false);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ Timestamp val =
+ ((TimestampObjectInspector) inspector).
+ getPrimitiveJavaObject(obj);
+ seconds.write((val.getTime() / MILLIS_PER_SECOND) - BASE_TIMESTAMP);
+ nanos.write(formatNanos(val.getNanos()));
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ seconds.flush();
+ nanos.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ private static long formatNanos(int nanos) {
+ if (nanos == 0) {
+ return 0;
+ } else if (nanos % 100 != 0) {
+ return ((long) nanos) << 3;
+ } else {
+ nanos /= 100;
+ int trailingZeros = 1;
+ while (nanos % 10 == 0 && trailingZeros < 7) {
+ nanos /= 10;
+ trailingZeros += 1;
+ }
+ return ((long) nanos) << 3 | trailingZeros;
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ seconds.getPosition(recorder);
+ nanos.getPosition(recorder);
+ }
+ }
+
+ private static class StructTreeWriter extends TreeWriter {
+ private final List<? extends StructField> fields;
+ StructTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ StructObjectInspector structObjectInspector =
+ (StructObjectInspector) inspector;
+ fields = structObjectInspector.getAllStructFieldRefs();
+ childrenWriters = new TreeWriter[fields.size()];
+ for(int i=0; i < childrenWriters.length; ++i) {
+ childrenWriters[i] = createTreeWriter(
+ fields.get(i).getFieldObjectInspector(), writer, true);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ StructObjectInspector insp = (StructObjectInspector) inspector;
+ for(int i = 0; i < fields.size(); ++i) {
+ StructField field = fields.get(i);
+ TreeWriter writer = childrenWriters[i];
+ writer.write(insp.getStructFieldData(obj, field));
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ private static class ListTreeWriter extends TreeWriter {
+ private final RunLengthIntegerWriter lengths;
+
+ ListTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ ListObjectInspector listObjectInspector = (ListObjectInspector) inspector;
+ childrenWriters = new TreeWriter[1];
+ childrenWriters[0] =
+ createTreeWriter(listObjectInspector.getListElementObjectInspector(),
+ writer, true);
+ lengths =
+ new RunLengthIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ ListObjectInspector insp = (ListObjectInspector) inspector;
+ int len = insp.getListLength(obj);
+ lengths.write(len);
+ for(int i=0; i < len; ++i) {
+ childrenWriters[0].write(insp.getListElement(obj, i));
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ lengths.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ lengths.getPosition(recorder);
+ }
+ }
+
+ private static class MapTreeWriter extends TreeWriter {
+ private final RunLengthIntegerWriter lengths;
+
+ MapTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ MapObjectInspector insp = (MapObjectInspector) inspector;
+ childrenWriters = new TreeWriter[2];
+ childrenWriters[0] =
+ createTreeWriter(insp.getMapKeyObjectInspector(), writer, true);
+ childrenWriters[1] =
+ createTreeWriter(insp.getMapValueObjectInspector(), writer, true);
+ lengths =
+ new RunLengthIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ MapObjectInspector insp = (MapObjectInspector) inspector;
+ int len = insp.getMapSize(obj);
+ lengths.write(len);
+ // this sucks, but it will have to do until we can get a better
+ // accessor in the MapObjectInspector.
+ Map<?, ?> valueMap = insp.getMap(obj);
+ for(Map.Entry<?, ?> entry: valueMap.entrySet()) {
+ childrenWriters[0].write(entry.getKey());
+ childrenWriters[1].write(entry.getValue());
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ lengths.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ lengths.getPosition(recorder);
+ }
+ }
+
+ private static class UnionTreeWriter extends TreeWriter {
+ private final RunLengthByteWriter tags;
+
+ UnionTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ UnionObjectInspector insp = (UnionObjectInspector) inspector;
+ List<ObjectInspector> choices = insp.getObjectInspectors();
+ childrenWriters = new TreeWriter[choices.size()];
+ for(int i=0; i < childrenWriters.length; ++i) {
+ childrenWriters[i] = createTreeWriter(choices.get(i), writer, true);
+ }
+ tags =
+ new RunLengthByteWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.DATA));
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ UnionObjectInspector insp = (UnionObjectInspector) inspector;
+ byte tag = insp.getTag(obj);
+ tags.write(tag);
+ childrenWriters[tag].write(insp.getField(obj));
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ tags.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ tags.getPosition(recorder);
+ }
+ }
+
+ private static TreeWriter createTreeWriter(ObjectInspector inspector,
+ StreamFactory streamFactory,
+ boolean nullable
+ ) throws IOException {
+ switch (inspector.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
+ case BOOLEAN:
+ return new BooleanTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case BYTE:
+ return new ByteTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case SHORT:
+ case INT:
+ case LONG:
+ return new IntegerTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case FLOAT:
+ return new FloatTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case DOUBLE:
+ return new DoubleTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case STRING:
+ return new StringTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case BINARY:
+ return new BinaryTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ case TIMESTAMP:
+ return new TimestampTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
+ default:
+ throw new IllegalArgumentException("Bad primitive category " +
+ ((PrimitiveObjectInspector) inspector).getPrimitiveCategory());
+ }
+ case STRUCT:
+ return new StructTreeWriter(streamFactory.getNextColumnId(), inspector,
+ streamFactory, nullable);
+ case MAP:
+ return new MapTreeWriter(streamFactory.getNextColumnId(), inspector,
+ streamFactory, nullable);
+ case LIST:
+ return new ListTreeWriter(streamFactory.getNextColumnId(), inspector,
+ streamFactory, nullable);
+ case UNION:
+ return new UnionTreeWriter(streamFactory.getNextColumnId(), inspector,
+ streamFactory, nullable);
+ default:
+ throw new IllegalArgumentException("Bad category: " +
+ inspector.getCategory());
+ }
+ }
+
+ private static void writeTypes(OrcProto.Footer.Builder builder,
+ TreeWriter treeWriter) {
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ switch (treeWriter.inspector.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveObjectInspector) treeWriter.inspector).
+ getPrimitiveCategory()) {
+ case BOOLEAN:
+ type.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ type.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ type.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ type.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ type.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ type.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ type.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ type.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case BINARY:
+ type.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown primitive category: " +
+ ((PrimitiveObjectInspector) treeWriter.inspector).
+ getPrimitiveCategory());
+ }
+ break;
+ case LIST:
+ type.setKind(OrcProto.Type.Kind.LIST);
+ type.addSubtypes(treeWriter.childrenWriters[0].id);
+ break;
+ case MAP:
+ type.setKind(OrcProto.Type.Kind.MAP);
+ type.addSubtypes(treeWriter.childrenWriters[0].id);
+ type.addSubtypes(treeWriter.childrenWriters[1].id);
+ break;
+ case STRUCT:
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+ for(TreeWriter child: treeWriter.childrenWriters) {
+ type.addSubtypes(child.id);
+ }
+ for(StructField field: ((StructTreeWriter) treeWriter).fields) {
+ type.addFieldNames(field.getFieldName());
+ }
+ break;
+ case UNION:
+ type.setKind(OrcProto.Type.Kind.UNION);
+ for(TreeWriter child: treeWriter.childrenWriters) {
+ type.addSubtypes(child.id);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown category: " +
+ treeWriter.inspector.getCategory());
+ }
+ builder.addTypes(type);
+ for(TreeWriter child: treeWriter.childrenWriters) {
+ writeTypes(builder, child);
+ }
+ }
+
+ private void ensureWriter() throws IOException {
+ if (rawWriter == null) {
+ rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
+ fs.getDefaultReplication(),
+ Math.min(stripeSize * 2L, Integer.MAX_VALUE));
+ rawWriter.writeBytes(OrcFile.MAGIC);
+ headerLength = rawWriter.getPos();
+ writer = new OutStream("metadata", bufferSize, codec,
+ new DirectStream(rawWriter));
+ protobufWriter = CodedOutputStream.newInstance(writer);
+ }
+ }
+
+ private void createRowIndexEntry() throws IOException {
+ treeWriter.createRowIndexEntry();
+ rowsInIndex = 0;
+ }
+
+ private void flushStripe() throws IOException {
+ ensureWriter();
+ if (buildIndex && rowsInIndex != 0) {
+ createRowIndexEntry();
+ }
+ if (rowsInStripe != 0) {
+ int requiredIndexEntries = rowIndexStride == 0 ? 0 :
+ (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
+ OrcProto.StripeFooter.Builder builder =
+ OrcProto.StripeFooter.newBuilder();
+ treeWriter.writeStripe(builder, requiredIndexEntries);
+ long start = rawWriter.getPos();
+ long section = start;
+ long indexEnd = start;
+ for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
+ BufferedStream stream = pair.getValue();
+ stream.flush();
+ stream.spillTo(rawWriter);
+ stream.clear();
+ long end = rawWriter.getPos();
+ StreamName name = pair.getKey();
+ builder.addStreams(OrcProto.Stream.newBuilder()
+ .setColumn(name.getColumn())
+ .setKind(name.getKind())
+ .setLength(end-section));
+ section = end;
+ if (StreamName.Area.INDEX == name.getArea()) {
+ indexEnd = end;
+ }
+ }
+ builder.build().writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ long end = rawWriter.getPos();
+ OrcProto.StripeInformation dirEntry =
+ OrcProto.StripeInformation.newBuilder()
+ .setOffset(start)
+ .setIndexLength(indexEnd - start)
+ .setDataLength(section - indexEnd)
+ .setNumberOfRows(rowsInStripe)
+ .setFooterLength(end - section).build();
+ stripes.add(dirEntry);
+ rowCount += rowsInStripe;
+ rowsInStripe = 0;
+ }
+ }
+
+ private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
+ switch (kind) {
+ case NONE: return OrcProto.CompressionKind.NONE;
+ case ZLIB: return OrcProto.CompressionKind.ZLIB;
+ case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
+ case LZO: return OrcProto.CompressionKind.LZO;
+ default:
+ throw new IllegalArgumentException("Unknown compression " + kind);
+ }
+ }
+
+ private void writeFileStatistics(OrcProto.Footer.Builder builder,
+ TreeWriter writer) throws IOException {
+ builder.addStatistics(writer.fileStatistics.serialize());
+ for(TreeWriter child: writer.getChildrenWriters()) {
+ writeFileStatistics(builder, child);
+ }
+ }
+
+ private int writeFooter(long bodyLength) throws IOException {
+ ensureWriter();
+ OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
+ builder.setContentLength(bodyLength);
+ builder.setHeaderLength(headerLength);
+ builder.setNumberOfRows(rowCount);
+ builder.setRowIndexStride(rowIndexStride);
+ // serialize the types
+ writeTypes(builder, treeWriter);
+ // add the stripe information
+ for(OrcProto.StripeInformation stripe: stripes) {
+ builder.addStripes(stripe);
+ }
+ // add the column statistics
+ writeFileStatistics(builder, treeWriter);
+ // add all of the user metadata
+ for(Map.Entry<String, ByteString> entry: userMetadata.entrySet()) {
+ builder.addMetadata(OrcProto.UserMetadataItem.newBuilder()
+ .setName(entry.getKey()).setValue(entry.getValue()));
+ }
+ long startPosn = rawWriter.getPos();
+ builder.build().writeTo(protobufWriter);
+ protobufWriter.flush();
+ writer.flush();
+ return (int) (rawWriter.getPos() - startPosn);
+ }
+
+ private int writePostScript(int footerLength) throws IOException {
+ OrcProto.PostScript.Builder builder =
+ OrcProto.PostScript.newBuilder()
+ .setCompression(writeCompressionKind(compress))
+ .setFooterLength(footerLength);
+ if (compress != CompressionKind.NONE) {
+ builder.setCompressionBlockSize(bufferSize);
+ }
+ OrcProto.PostScript ps = builder.build();
+ // need to write this uncompressed
+ long startPosn = rawWriter.getPos();
+ ps.writeTo(rawWriter);
+ long length = rawWriter.getPos() - startPosn;
+ if (length > 255) {
+ throw new IllegalArgumentException("PostScript too large at " + length);
+ }
+ return (int) length;
+ }
+
+ private long estimateStripeSize() {
+ long result = 0;
+ for(BufferedStream stream: streams.values()) {
+ result += stream.getSize();
+ }
+ result += treeWriter.estimateMemory();
+ return result;
+ }
+
+ @Override
+ public void addUserMetadata(String name, ByteBuffer value) {
+ userMetadata.put(name, ByteString.copyFrom(value));
+ }
+
+ @Override
+ public void addRow(Object row) throws IOException {
+ treeWriter.write(row);
+ rowsInStripe += 1;
+ if (buildIndex) {
+ rowsInIndex += 1;
+
+ if (rowsInIndex >= rowIndexStride) {
+ createRowIndexEntry();
+ }
+ }
+ // once every 1000 rows, check the size to see if we should spill
+ if (rowsInStripe % 1000 == 0 && estimateStripeSize() > stripeSize) {
+ flushStripe();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flushStripe();
+ int footerLength = writeFooter(rawWriter.getPos());
+ rawWriter.writeByte(writePostScript(footerLength));
+ rawWriter.close();
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+class ZlibCodec implements CompressionCodec {
+
+ @Override
+ public boolean compress(ByteBuffer in, ByteBuffer out,
+ ByteBuffer overflow) throws IOException {
+ Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
+ int length = in.remaining();
+ deflater.setInput(in.array(), in.arrayOffset() + in.position(), length);
+ deflater.finish();
+ int outSize = 0;
+ int offset = out.arrayOffset() + out.position();
+ while (!deflater.finished() && (length > outSize)) {
+ int size = deflater.deflate(out.array(), offset, out.remaining());
+ out.position(size + out.position());
+ outSize += size;
+ offset += size;
+ // if we run out of space in the out buffer, use the overflow
+ if (out.remaining() == 0) {
+ if (overflow == null) {
+ deflater.end();
+ return false;
+ }
+ out = overflow;
+ offset = out.arrayOffset() + out.position();
+ }
+ }
+ deflater.end();
+ return length > outSize;
+ }
+
+ @Override
+ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
+ Inflater inflater = new Inflater(true);
+ inflater.setInput(in.array(), in.arrayOffset() + in.position(),
+ in.remaining());
+ while (!(inflater.finished() || inflater.needsDictionary() ||
+ inflater.needsInput())) {
+ try {
+ int count = inflater.inflate(out.array(),
+ out.arrayOffset() + out.position(),
+ out.remaining());
+ out.position(count + out.position());
+ } catch (DataFormatException dfe) {
+ throw new IOException("Bad compression data", dfe);
+ }
+ }
+ out.flip();
+ inflater.end();
+ in.position(in.limit());
+ }
+
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/package-info.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/package-info.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/package-info.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/package-info.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The Optimized Row Columnar (ORC) File Format.
+ *
+ * This format:
+ * <ul>
+ * <li>Decomposes complex column types into primitives</li>
+ * <li>Uses type-specific encoders for each column
+ * <ul>
+ * <li>Dictionary encodings for low cardinality columns</li>
+ * <li>Run length encoding of data</li>
+ * <li>variable length encoding of integers</li>
+ * </ul>
+ * </li>
+ * <li>Divides file into large stripes</li>
+ * <li>Each stripe includes light-weight indexes that enable the reader to
+ * skip large sets of rows that don't satisfy the filter condition</li>
+ * <li>A file footer that contains meta-information about file
+ * <ul>
+ * <li>Precise byte range for each stripe</li>
+ * <li>Type information for the file</li>
+ * <li>Any user meta-information</li>
+ * </ul>
+ * </li>
+ * <li>Seek to row number is implemented to support secondary indexes</li>
+ * <li>Support for additional generic compression: LZO, SNAPPY, ZLIB.</li>
+ * </ul>
+ *
+ * <p>
+ * <b>Format:</b>
+ * <pre>
+ * {@code
+ * HEADER (3 bytes) "ORC"
+ * STRIPE (0 or more stripes)
+ * FILE-FOOTER
+ * POST SCRIPT
+ * PS LENGTH (1 byte)
+ * }
+ * </pre>
+ * </p>
+ *
+ * <p>
+ * <b>Stripe:</b>
+ * <pre>
+ * {@code
+ * INDEX-STREAM (0 or more)
+ * DATA-STREAM (0 or more)
+ * STRIPE-FOOTER
+ * }
+ * </pre>
+ * </p>
+ */
+package org.apache.hadoop.hive.ql.io.orc;
Added: hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto (added)
+++ hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto Tue Mar 5 20:44:50 2013
@@ -0,0 +1,130 @@
+package org.apache.hadoop.hive.ql.io.orc;
+
+message IntegerStatistics {
+ optional sint64 minimum = 1;
+ optional sint64 maximum = 2;
+ optional sint64 sum = 3;
+}
+
+message DoubleStatistics {
+ optional double minimum = 1;
+ optional double maximum = 2;
+ optional double sum = 3;
+}
+
+message StringStatistics {
+ optional string minimum = 1;
+ optional string maximum = 2;
+}
+
+message BucketStatistics {
+ repeated uint64 count = 1 [packed=true];
+}
+
+message ColumnStatistics {
+ optional uint64 numberOfValues = 1;
+ optional IntegerStatistics intStatistics = 2;
+ optional DoubleStatistics doubleStatistics = 3;
+ optional StringStatistics stringStatistics = 4;
+ optional BucketStatistics bucketStatistics = 5;
+}
+
+message RowIndexEntry {
+ repeated uint64 positions = 1 [packed=true];
+ optional ColumnStatistics statistics = 2;
+}
+
+message RowIndex {
+ repeated RowIndexEntry entry = 1;
+}
+
+message Stream {
+ // if you add new index stream kinds, you need to make sure to update
+ // StreamName to ensure it is added to the stripe in the right area
+ enum Kind {
+ PRESENT = 0;
+ DATA = 1;
+ LENGTH = 2;
+ DICTIONARY_DATA = 3;
+ DICTIONARY_COUNT = 4;
+ NANO_DATA = 5;
+ ROW_INDEX = 6;
+ }
+ required Kind kind = 1;
+ optional uint32 column = 2;
+ optional uint64 length = 3;
+}
+
+message ColumnEncoding {
+ enum Kind {
+ DIRECT = 0;
+ DICTIONARY = 1;
+ }
+ required Kind kind = 1;
+ optional uint32 dictionarySize = 2;
+}
+
+message StripeFooter {
+ repeated Stream streams = 1;
+ repeated ColumnEncoding columns = 2;
+}
+
+message Type {
+ enum Kind {
+ BOOLEAN = 0;
+ BYTE = 1;
+ SHORT = 2;
+ INT = 3;
+ LONG = 4;
+ FLOAT = 5;
+ DOUBLE = 6;
+ STRING = 7;
+ BINARY = 8;
+ TIMESTAMP = 9;
+ LIST = 10;
+ MAP = 11;
+ STRUCT = 12;
+ UNION = 13;
+ }
+ required Kind kind = 1;
+ repeated uint32 subtypes = 2 [packed=true];
+ repeated string fieldNames = 3;
+}
+
+message StripeInformation {
+ optional uint64 offset = 1;
+ optional uint64 indexLength = 2;
+ optional uint64 dataLength = 3;
+ optional uint64 footerLength = 4;
+ optional uint64 numberOfRows = 5;
+}
+
+message UserMetadataItem {
+ required string name = 1;
+ required bytes value = 2;
+}
+
+message Footer {
+ optional uint64 headerLength = 1;
+ optional uint64 contentLength = 2;
+ repeated StripeInformation stripes = 3;
+ repeated Type types = 4;
+ repeated UserMetadataItem metadata = 5;
+ optional uint64 numberOfRows = 6;
+ repeated ColumnStatistics statistics = 7;
+ optional uint32 rowIndexStride = 8;
+}
+
+enum CompressionKind {
+ NONE = 0;
+ ZLIB = 1;
+ SNAPPY = 2;
+ LZO = 3;
+}
+
+// Serialized length must be less that 255 bytes
+message PostScript {
+ optional uint64 footerLength = 1;
+ optional CompressionKind compression = 2;
+ optional uint64 compressionBlockSize = 3;
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import static junit.framework.Assert.assertEquals;
+
+public class TestBitFieldReader {
+
+ public void runSeekTest(CompressionCodec codec) throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ final int COUNT = 16384;
+ BitFieldWriter out = new BitFieldWriter(
+ new OutStream("test", 500, codec, collect), 1);
+ TestInStream.PositionCollector[] positions =
+ new TestInStream.PositionCollector[COUNT];
+ for(int i=0; i < COUNT; ++i) {
+ positions[i] = new TestInStream.PositionCollector();
+ out.getPosition(positions[i]);
+ // test runs, non-runs
+ if (i < COUNT / 2) {
+ out.write(i & 1);
+ } else {
+ out.write((i/3) & 1);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ BitFieldReader in = new BitFieldReader(InStream.create("test", inBuf,
+ codec, 500), 1);
+ for(int i=0; i < COUNT; ++i) {
+ int x = in.next();
+ if (i < COUNT / 2) {
+ assertEquals(i & 1, x);
+ } else {
+ assertEquals((i/3) & 1, x);
+ }
+ }
+ for(int i=COUNT-1; i >= 0; --i) {
+ in.seek(positions[i]);
+ int x = in.next();
+ if (i < COUNT / 2) {
+ assertEquals(i & 1, x);
+ } else {
+ assertEquals((i/3) & 1, x);
+ }
+ }
+ }
+
+ @Test
+ public void testUncompressedSeek() throws Exception {
+ runSeekTest(null);
+ }
+
+ @Test
+ public void testCompressedSeek() throws Exception {
+ runSeekTest(new ZlibCodec());
+ }
+
+ @Test
+ public void testBiggerItems() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ final int COUNT = 16384;
+ BitFieldWriter out = new BitFieldWriter(
+ new OutStream("test", 500, null, collect), 3);
+ for(int i=0; i < COUNT; ++i) {
+ // test runs, non-runs
+ if (i < COUNT / 2) {
+ out.write(i & 7);
+ } else {
+ out.write((i/3) & 7);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ BitFieldReader in = new BitFieldReader(InStream.create("test", inBuf,
+ null, 500), 3);
+ for(int i=0; i < COUNT; ++i) {
+ int x = in.next();
+ if (i < COUNT / 2) {
+ assertEquals(i & 7, x);
+ } else {
+ assertEquals((i/3) & 7, x);
+ }
+ }
+ }
+
+ @Test
+ public void testSkips() throws Exception {
+ TestInStream.OutputCollector collect = new TestInStream.OutputCollector();
+ BitFieldWriter out = new BitFieldWriter(
+ new OutStream("test", 100, null, collect), 1);
+ final int COUNT = 16384;
+ for(int i=0; i < COUNT; ++i) {
+ if (i < COUNT/2) {
+ out.write(i & 1);
+ } else {
+ out.write((i/3) & 1);
+ }
+ }
+ out.flush();
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ BitFieldReader in = new BitFieldReader(InStream.create
+ ("test", inBuf, null, 100), 1);
+ for(int i=0; i < COUNT; i += 5) {
+ int x = (int) in.next();
+ if (i < COUNT/2) {
+ assertEquals(i & 1, x);
+ } else {
+ assertEquals((i/3) & 1, x);
+ }
+ if (i < COUNT - 5) {
+ in.skip(4);
+ }
+ in.skip(0);
+ }
+ }
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestDynamicArray.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestDynamicArray.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestDynamicArray.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestDynamicArray.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.util.Random;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestDynamicArray {
+
+ @Test
+ public void testByteArray() throws Exception {
+ DynamicByteArray dba = new DynamicByteArray(3, 10);
+ dba.add((byte) 0);
+ dba.add((byte) 1);
+ dba.set(3, (byte) 3);
+ dba.set(2, (byte) 2);
+ dba.add((byte) 4);
+ assertEquals("{0,1,2,3,4}", dba.toString());
+ assertEquals(5, dba.size());
+ byte[] val;
+ val = new byte[0];
+ assertEquals(0, dba.compare(val, 0, 0, 2, 0));
+ assertEquals(-1, dba.compare(val, 0, 0, 2, 1));
+ val = new byte[]{3,42};
+ assertEquals(1, dba.compare(val, 0, 1, 2, 0));
+ assertEquals(1, dba.compare(val, 0, 1, 2, 1));
+ assertEquals(0, dba.compare(val, 0, 1, 3, 1));
+ assertEquals(-1, dba.compare(val, 0, 1, 3, 2));
+ assertEquals(1, dba.compare(val, 0, 2, 3, 1));
+ val = new byte[256];
+ for(int b=-128; b < 128; ++b) {
+ dba.add((byte) b);
+ val[b+128] = (byte) b;
+ }
+ assertEquals(0, dba.compare(val, 0, 256, 5, 256));
+ assertEquals(1, dba.compare(val, 0, 1, 0, 1));
+ assertEquals(1, dba.compare(val, 254, 1, 0, 1));
+ assertEquals(1, dba.compare(val, 120, 1, 64, 1));
+ val = new byte[1024];
+ Random rand = new Random(1701);
+ for(int i = 0; i < val.length; ++i) {
+ rand.nextBytes(val);
+ }
+ dba.add(val, 0, 1024);
+ assertEquals(1285, dba.size());
+ assertEquals(0, dba.compare(val, 0, 1024, 261, 1024));
+ }
+
+ @Test
+ public void testIntArray() throws Exception {
+ DynamicIntArray dia = new DynamicIntArray(10);
+ for(int i=0; i < 10000; ++i) {
+ dia.add(2*i);
+ }
+ assertEquals(10000, dia.size());
+ for(int i=0; i < 10000; ++i) {
+ assertEquals(2*i, dia.get(i));
+ }
+ dia.clear();
+ assertEquals(0, dia.size());
+ dia.add(3);
+ dia.add(12);
+ dia.add(65);
+ assertEquals("{3,12,65}", dia.toString());
+ for(int i=0; i < 5; ++i) {
+ dia.increment(i, 3);
+ }
+ assertEquals("{6,15,68,3,3}", dia.toString());
+ }
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.PrintStream;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestFileDump {
+
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ Path resourceDir = new Path(System.getProperty("test.build.resources",
+ "src" + File.separator + "test" + File.separator + "resources"));
+
+ Configuration conf;
+ FileSystem fs;
+ Path testFilePath;
+
+ @Rule
+ public TestName testCaseName = new TestName();
+
+ @Before
+ public void openFileSystem () throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ fs.setWorkingDirectory(workDir);
+ testFilePath = new Path("TestFileDump." +
+ testCaseName.getMethodName() + ".orc");
+ fs.delete(testFilePath, false);
+ }
+
+ static class MyRecord {
+ int i;
+ long l;
+ String s;
+ MyRecord(int i, long l, String s) {
+ this.i = i;
+ this.l = l;
+ this.s = s;
+ }
+ }
+
+ private static final String outputFilename =
+ File.separator + "orc-file-dump.out";
+
+ private static void checkOutput(String expected,
+ String actual) throws Exception {
+ BufferedReader eStream =
+ new BufferedReader(new FileReader(expected));
+ BufferedReader aStream =
+ new BufferedReader(new FileReader(actual));
+ String line = eStream.readLine();
+ while (line != null) {
+ assertEquals(line, aStream.readLine());
+ line = eStream.readLine();
+ }
+ assertNull(eStream.readLine());
+ assertNull(aStream.readLine());
+ }
+
+ @Test
+ public void testDump() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestOrcFile.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (MyRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+ Writer writer = OrcFile.createWriter(fs, testFilePath, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ Random r1 = new Random(1);
+ String[] words = new String[]{"It", "was", "the", "best", "of", "times,",
+ "it", "was", "the", "worst", "of", "times,", "it", "was", "the", "age",
+ "of", "wisdom,", "it", "was", "the", "age", "of", "foolishness,", "it",
+ "was", "the", "epoch", "of", "belief,", "it", "was", "the", "epoch",
+ "of", "incredulity,", "it", "was", "the", "season", "of", "Light,",
+ "it", "was", "the", "season", "of", "Darkness,", "it", "was", "the",
+ "spring", "of", "hope,", "it", "was", "the", "winter", "of", "despair,",
+ "we", "had", "everything", "before", "us,", "we", "had", "nothing",
+ "before", "us,", "we", "were", "all", "going", "direct", "to",
+ "Heaven,", "we", "were", "all", "going", "direct", "the", "other",
+ "way"};
+ for(int i=0; i < 21000; ++i) {
+ writer.addRow(new MyRecord(r1.nextInt(), r1.nextLong(),
+ words[r1.nextInt(words.length)]));
+ }
+ writer.close();
+ PrintStream origOut = System.out;
+ FileOutputStream myOut = new FileOutputStream(workDir + File.separator +
+ "orc-file-dump.out");
+
+ // replace stdout and run command
+ System.setOut(new PrintStream(myOut));
+ FileDump.main(new String[]{testFilePath.toString()});
+ System.out.flush();
+ System.setOut(origOut);
+
+ checkOutput(resourceDir + outputFilename, workDir + outputFilename);
+ }
+}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java?rev=1452992&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java Tue Mar 5 20:44:50 2013
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+
+public class TestInStream {
+
+ static class OutputCollector implements OutStream.OutputReceiver {
+ DynamicByteArray buffer = new DynamicByteArray();
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ this.buffer.add(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ static class PositionCollector implements PositionProvider, PositionRecorder {
+ private List<Long> positions = new ArrayList<Long>();
+ private int index = 0;
+
+ @Override
+ public long getNext() {
+ return positions.get(index++);
+ }
+
+ @Override
+ public void addPosition(long offset) {
+ positions.add(offset);
+ }
+ }
+
+ @Test
+ public void testUncompressed() throws Exception {
+ OutputCollector collect = new OutputCollector();
+ OutStream out = new OutStream("test", 100, null, collect);
+ PositionCollector[] positions = new PositionCollector[1024];
+ for(int i=0; i < 1024; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ out.write(i);
+ }
+ out.flush();
+ assertEquals(1024, collect.buffer.size());
+ for(int i=0; i < 1024; ++i) {
+ assertEquals((byte) i, collect.buffer.get(i));
+ }
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ InStream in = InStream.create("test", inBuf, null, 100);
+ assertEquals("uncompressed stream test base: 0 offset: 0 limit: 1024",
+ in.toString());
+ for(int i=0; i < 1024; ++i) {
+ int x = in.read();
+ assertEquals(i & 0xff, x);
+ }
+ for(int i=1023; i >= 0; --i) {
+ in.seek(positions[i]);
+ assertEquals(i & 0xff, in.read());
+ }
+ }
+
+ @Test
+ public void testCompressed() throws Exception {
+ OutputCollector collect = new OutputCollector();
+ CompressionCodec codec = new ZlibCodec();
+ OutStream out = new OutStream("test", 300, codec, collect);
+ PositionCollector[] positions = new PositionCollector[1024];
+ for(int i=0; i < 1024; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ out.write(i);
+ }
+ out.flush();
+ assertEquals("test", out.toString());
+ assertEquals(961, collect.buffer.size());
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ InStream in = InStream.create("test", inBuf, codec, 300);
+ assertEquals("compressed stream test base: 0 offset: 0 limit: 961",
+ in.toString());
+ for(int i=0; i < 1024; ++i) {
+ int x = in.read();
+ assertEquals(i & 0xff, x);
+ }
+ assertEquals(0, in.available());
+ for(int i=1023; i >= 0; --i) {
+ in.seek(positions[i]);
+ assertEquals(i & 0xff, in.read());
+ }
+ }
+
+ @Test
+ public void testCorruptStream() throws Exception {
+ OutputCollector collect = new OutputCollector();
+ CompressionCodec codec = new ZlibCodec();
+ OutStream out = new OutStream("test", 500, codec, collect);
+ PositionCollector[] positions = new PositionCollector[1024];
+ for(int i=0; i < 1024; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ out.write(i);
+ }
+ out.flush();
+
+ // now try to read the stream with a buffer that is too small
+ ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
+ collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
+ inBuf.flip();
+ InStream in = InStream.create("test", inBuf, codec, 100);
+ byte[] contents = new byte[1024];
+ try {
+ in.read(contents);
+ fail();
+ } catch(IllegalArgumentException iae) {
+ // EXPECTED
+ }
+
+ // make a corrupted header
+ inBuf.clear();
+ inBuf.put((byte) 32);
+ inBuf.put((byte) 0);
+ inBuf.flip();
+ in = InStream.create("test2", inBuf, codec, 300);
+ try {
+ in.read();
+ fail();
+ } catch (IllegalStateException ise) {
+ // EXPECTED
+ }
+ }
+}