You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/14 22:36:17 UTC
[4/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
new file mode 100644
index 0000000..5157d4d
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -0,0 +1,2912 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.BinaryColumnStatistics;
+import org.apache.orc.BloomFilterIO;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+
+/**
+ * An ORC file writer. The file is divided into stripes, which is the natural
+ * unit of work when reading. Each stripe is buffered in memory until the
+ * memory reaches the stripe size and then it is written out broken down by
+ * columns. Each column is written by a TreeWriter that is specific to that
+ * type of column. TreeWriters may have children TreeWriters that handle the
+ * sub-types. Each of the TreeWriters writes the column's data as a set of
+ * streams.
+ *
+ * This class is unsynchronized like most Stream objects, so from the creation
+ * of an OrcFile and all access to a single instance has to be from a single
+ * thread.
+ *
+ * There are no known cases where these happen between different threads today.
+ *
+ * Caveat: the MemoryManager is created during WriterOptions create, that has
+ * to be confined to a single thread as well.
+ *
+ */
+public class WriterImpl implements Writer, MemoryManager.Callback {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
+
+ private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+ private static final int MIN_ROW_INDEX_STRIDE = 1000;
+
+ // threshold above which buffer size will be automatically resized
+ private static final int COLUMN_COUNT_THRESHOLD = 1000;
+
+ private final FileSystem fs;
+ private final Path path;
+ private final long defaultStripeSize;
+ private long adjustedStripeSize;
+ private final int rowIndexStride;
+ private final CompressionKind compress;
+ private final CompressionCodec codec;
+ private final boolean addBlockPadding;
+ private final int bufferSize;
+ private final long blockSize;
+ private final double paddingTolerance;
+ private final TypeDescription schema;
+
+ // the streams that make up the current stripe
+ private final Map<StreamName, BufferedStream> streams =
+ new TreeMap<StreamName, BufferedStream>();
+
+ private FSDataOutputStream rawWriter = null;
+ // the compressed metadata information outStream
+ private OutStream writer = null;
+ // a protobuf outStream around streamFactory
+ private CodedOutputStream protobufWriter = null;
+ private long headerLength;
+ private int columnCount;
+ private long rowCount = 0;
+ private long rowsInStripe = 0;
+ private long rawDataSize = 0;
+ private int rowsInIndex = 0;
+ private int stripesAtLastFlush = -1;
+ private final List<OrcProto.StripeInformation> stripes =
+ new ArrayList<OrcProto.StripeInformation>();
+ private final Map<String, ByteString> userMetadata =
+ new TreeMap<String, ByteString>();
+ private final StreamFactory streamFactory = new StreamFactory();
+ private final TreeWriter treeWriter;
+ private final boolean buildIndex;
+ private final MemoryManager memoryManager;
+ private final OrcFile.Version version;
+ private final Configuration conf;
+ private final OrcFile.WriterCallback callback;
+ private final OrcFile.WriterContext callbackContext;
+ private final OrcFile.EncodingStrategy encodingStrategy;
+ private final OrcFile.CompressionStrategy compressionStrategy;
+ private final boolean[] bloomFilterColumns;
+ private final double bloomFilterFpp;
+ private boolean writeTimeZone;
+
+ public WriterImpl(FileSystem fs,
+ Path path,
+ OrcFile.WriterOptions opts) throws IOException {
+ this.fs = fs;
+ this.path = path;
+ this.conf = opts.getConfiguration();
+ this.callback = opts.getCallback();
+ this.schema = opts.getSchema();
+ if (callback != null) {
+ callbackContext = new OrcFile.WriterContext(){
+
+ @Override
+ public Writer getWriter() {
+ return WriterImpl.this;
+ }
+ };
+ } else {
+ callbackContext = null;
+ }
+ this.adjustedStripeSize = opts.getStripeSize();
+ this.defaultStripeSize = opts.getStripeSize();
+ this.version = opts.getVersion();
+ this.encodingStrategy = opts.getEncodingStrategy();
+ this.compressionStrategy = opts.getCompressionStrategy();
+ this.addBlockPadding = opts.getBlockPadding();
+ this.blockSize = opts.getBlockSize();
+ this.paddingTolerance = opts.getPaddingTolerance();
+ this.compress = opts.getCompress();
+ this.rowIndexStride = opts.getRowIndexStride();
+ this.memoryManager = opts.getMemoryManager();
+ buildIndex = rowIndexStride > 0;
+ codec = createCodec(compress);
+ int numColumns = schema.getMaximumId() + 1;
+ this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
+ numColumns, opts.getBufferSize());
+ if (version == OrcFile.Version.V_0_11) {
+ /* do not write bloom filters for ORC v11 */
+ this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
+ } else {
+ this.bloomFilterColumns =
+ OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
+ }
+ this.bloomFilterFpp = opts.getBloomFilterFpp();
+ treeWriter = createTreeWriter(schema, streamFactory, false);
+ if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
+ throw new IllegalArgumentException("Row stride must be at least " +
+ MIN_ROW_INDEX_STRIDE);
+ }
+
+ // ensure that we are able to handle callbacks before we register ourselves
+ memoryManager.addWriter(path, opts.getStripeSize(), this);
+ }
+
+ @VisibleForTesting
+ public static int getEstimatedBufferSize(long stripeSize, int numColumns,
+ int bs) {
+ // The worst case is that there are 2 big streams per a column and
+ // we want to guarantee that each stream gets ~10 buffers.
+ // This keeps buffers small enough that we don't get really small stripe
+ // sizes.
+ int estBufferSize = (int) (stripeSize / (20 * numColumns));
+ estBufferSize = getClosestBufferSize(estBufferSize);
+ if (estBufferSize > bs) {
+ estBufferSize = bs;
+ } else {
+ LOG.info("WIDE TABLE - Number of columns: " + numColumns +
+ " Chosen compression buffer size: " + estBufferSize);
+ }
+ return estBufferSize;
+ }
+
+ private static int getClosestBufferSize(int estBufferSize) {
+ final int kb4 = 4 * 1024;
+ final int kb8 = 8 * 1024;
+ final int kb16 = 16 * 1024;
+ final int kb32 = 32 * 1024;
+ final int kb64 = 64 * 1024;
+ final int kb128 = 128 * 1024;
+ final int kb256 = 256 * 1024;
+ if (estBufferSize <= kb4) {
+ return kb4;
+ } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
+ return kb8;
+ } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
+ return kb16;
+ } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
+ return kb32;
+ } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
+ return kb64;
+ } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
+ return kb128;
+ } else {
+ return kb256;
+ }
+ }
+
+ public static CompressionCodec createCodec(CompressionKind kind) {
+ switch (kind) {
+ case NONE:
+ return null;
+ case ZLIB:
+ return new ZlibCodec();
+ case SNAPPY:
+ return new SnappyCodec();
+ case LZO:
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ if (loader == null) {
+ loader = WriterImpl.class.getClassLoader();
+ }
+ @SuppressWarnings("unchecked")
+ Class<? extends CompressionCodec> lzo =
+ (Class<? extends CompressionCodec>)
+ loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+ return lzo.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("LZO is not available.", e);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Problem initializing LZO", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Insufficient access to LZO", e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression codec: " +
+ kind);
+ }
+ }
+
+ @Override
+ public boolean checkMemory(double newScale) throws IOException {
+ long limit = (long) Math.round(adjustedStripeSize * newScale);
+ long size = estimateStripeSize();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
+ limit);
+ }
+ if (size > limit) {
+ flushStripe();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This class is used to hold the contents of streams as they are buffered.
+ * The TreeWriters write to the outStream and the codec compresses the
+ * data as buffers fill up and stores them in the output list. When the
+ * stripe is being written, the whole stream is written to the file.
+ */
+ private class BufferedStream implements OutStream.OutputReceiver {
+ private final OutStream outStream;
+ private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
+
+ BufferedStream(String name, int bufferSize,
+ CompressionCodec codec) throws IOException {
+ outStream = new OutStream(name, bufferSize, codec, this);
+ }
+
+ /**
+ * Receive a buffer from the compression codec.
+ * @param buffer the buffer to save
+ */
+ @Override
+ public void output(ByteBuffer buffer) {
+ output.add(buffer);
+ }
+
+ /**
+ * Get the number of bytes in buffers that are allocated to this stream.
+ * @return number of bytes in buffers
+ */
+ public long getBufferSize() {
+ long result = 0;
+ for(ByteBuffer buf: output) {
+ result += buf.capacity();
+ }
+ return outStream.getBufferSize() + result;
+ }
+
+ /**
+ * Flush the stream to the codec.
+ * @throws IOException
+ */
+ public void flush() throws IOException {
+ outStream.flush();
+ }
+
+ /**
+ * Clear all of the buffers.
+ * @throws IOException
+ */
+ public void clear() throws IOException {
+ outStream.clear();
+ output.clear();
+ }
+
+ /**
+ * Check the state of suppress flag in output stream
+ * @return value of suppress flag
+ */
+ public boolean isSuppressed() {
+ return outStream.isSuppressed();
+ }
+
+ /**
+ * Get the number of bytes that will be written to the output. Assumes
+ * the stream has already been flushed.
+ * @return the number of bytes
+ */
+ public long getOutputSize() {
+ long result = 0;
+ for(ByteBuffer buffer: output) {
+ result += buffer.remaining();
+ }
+ return result;
+ }
+
+ /**
+ * Write the saved compressed buffers to the OutputStream.
+ * @param out the stream to write to
+ * @throws IOException
+ */
+ void spillTo(OutputStream out) throws IOException {
+ for(ByteBuffer buffer: output) {
+ out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return outStream.toString();
+ }
+ }
+
+ /**
+ * An output receiver that writes the ByteBuffers to the output stream
+ * as they are received.
+ */
+ private class DirectStream implements OutStream.OutputReceiver {
+ private final FSDataOutputStream output;
+
+ DirectStream(FSDataOutputStream output) {
+ this.output = output;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ private static class RowIndexPositionRecorder implements PositionRecorder {
+ private final OrcProto.RowIndexEntry.Builder builder;
+
+ RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public void addPosition(long position) {
+ builder.addPositions(position);
+ }
+ }
+
+ /**
+ * Interface from the Writer to the TreeWriters. This limits the visibility
+ * that the TreeWriters have into the Writer.
+ */
+ private class StreamFactory {
+ /**
+ * Create a stream to store part of a column.
+ * @param column the column id for the stream
+ * @param kind the kind of stream
+ * @return The output outStream that the section needs to be written to.
+ * @throws IOException
+ */
+ public OutStream createStream(int column,
+ OrcProto.Stream.Kind kind
+ ) throws IOException {
+ final StreamName name = new StreamName(column, kind);
+ final EnumSet<CompressionCodec.Modifier> modifiers;
+
+ switch (kind) {
+ case BLOOM_FILTER:
+ case DATA:
+ case DICTIONARY_DATA:
+ if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
+ modifiers = EnumSet.of(CompressionCodec.Modifier.FAST,
+ CompressionCodec.Modifier.TEXT);
+ } else {
+ modifiers = EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+ CompressionCodec.Modifier.TEXT);
+ }
+ break;
+ case LENGTH:
+ case DICTIONARY_COUNT:
+ case PRESENT:
+ case ROW_INDEX:
+ case SECONDARY:
+ // easily compressed using the fastest modes
+ modifiers = EnumSet.of(CompressionCodec.Modifier.FASTEST,
+ CompressionCodec.Modifier.BINARY);
+ break;
+ default:
+ LOG.warn("Missing ORC compression modifiers for " + kind);
+ modifiers = null;
+ break;
+ }
+
+ BufferedStream result = streams.get(name);
+ if (result == null) {
+ result = new BufferedStream(name.toString(), bufferSize,
+ codec == null ? codec : codec.modify(modifiers));
+ streams.put(name, result);
+ }
+ return result.outStream;
+ }
+
+ /**
+ * Get the next column id.
+ * @return a number from 0 to the number of columns - 1
+ */
+ public int getNextColumnId() {
+ return columnCount++;
+ }
+
+ /**
+ * Get the stride rate of the row index.
+ */
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ /**
+ * Should be building the row index.
+ * @return true if we are building the index
+ */
+ public boolean buildIndex() {
+ return buildIndex;
+ }
+
+ /**
+ * Is the ORC file compressed?
+ * @return are the streams compressed
+ */
+ public boolean isCompressed() {
+ return codec != null;
+ }
+
+ /**
+ * Get the encoding strategy to use.
+ * @return encoding strategy
+ */
+ public OrcFile.EncodingStrategy getEncodingStrategy() {
+ return encodingStrategy;
+ }
+
+ /**
+ * Get the compression strategy to use.
+ * @return compression strategy
+ */
+ public OrcFile.CompressionStrategy getCompressionStrategy() {
+ return compressionStrategy;
+ }
+
+ /**
+ * Get the bloom filter columns
+ * @return bloom filter columns
+ */
+ public boolean[] getBloomFilterColumns() {
+ return bloomFilterColumns;
+ }
+
+ /**
+ * Get bloom filter false positive percentage.
+ * @return fpp
+ */
+ public double getBloomFilterFPP() {
+ return bloomFilterFpp;
+ }
+
+ /**
+ * Get the writer's configuration.
+ * @return configuration
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Get the version of the file to write.
+ */
+ public OrcFile.Version getVersion() {
+ return version;
+ }
+
+ public void useWriterTimeZone(boolean val) {
+ writeTimeZone = val;
+ }
+
+ public boolean hasWriterTimeZone() {
+ return writeTimeZone;
+ }
+ }
+
+ /**
+ * The parent class of all of the writers for each column. Each column
+ * is written by an instance of this class. The compound types (struct,
+ * list, map, and union) have children tree writers that write the children
+ * types.
+ */
+ private abstract static class TreeWriter {
+ protected final int id;
+ protected final BitFieldWriter isPresent;
+ private final boolean isCompressed;
+ protected final ColumnStatisticsImpl indexStatistics;
+ protected final ColumnStatisticsImpl stripeColStatistics;
+ private final ColumnStatisticsImpl fileStatistics;
+ protected TreeWriter[] childrenWriters;
+ protected final RowIndexPositionRecorder rowIndexPosition;
+ private final OrcProto.RowIndex.Builder rowIndex;
+ private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
+ private final PositionedOutputStream rowIndexStream;
+ private final PositionedOutputStream bloomFilterStream;
+ protected final BloomFilterIO bloomFilter;
+ protected final boolean createBloomFilter;
+ private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+ private final OrcProto.BloomFilter.Builder bloomFilterEntry;
+ private boolean foundNulls;
+ private OutStream isPresentOutStream;
+ private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders;
+ private final StreamFactory streamFactory;
+
+ /**
+ * Create a tree writer.
+ * @param columnId the column id of the column to write
+ * @param schema the row schema
+ * @param streamFactory limited access to the Writer's data.
+ * @param nullable can the value be null?
+ * @throws IOException
+ */
+ TreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory streamFactory,
+ boolean nullable) throws IOException {
+ this.streamFactory = streamFactory;
+ this.isCompressed = streamFactory.isCompressed();
+ this.id = columnId;
+ if (nullable) {
+ isPresentOutStream = streamFactory.createStream(id,
+ OrcProto.Stream.Kind.PRESENT);
+ isPresent = new BitFieldWriter(isPresentOutStream, 1);
+ } else {
+ isPresent = null;
+ }
+ this.foundNulls = false;
+ createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
+ indexStatistics = ColumnStatisticsImpl.create(schema);
+ stripeColStatistics = ColumnStatisticsImpl.create(schema);
+ fileStatistics = ColumnStatisticsImpl.create(schema);
+ childrenWriters = new TreeWriter[0];
+ rowIndex = OrcProto.RowIndex.newBuilder();
+ rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+ rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
+ stripeStatsBuilders = Lists.newArrayList();
+ if (streamFactory.buildIndex()) {
+ rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
+ } else {
+ rowIndexStream = null;
+ }
+ if (createBloomFilter) {
+ bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
+ bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+ bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
+ bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
+ streamFactory.getBloomFilterFPP());
+ } else {
+ bloomFilterEntry = null;
+ bloomFilterIndex = null;
+ bloomFilterStream = null;
+ bloomFilter = null;
+ }
+ }
+
+ protected OrcProto.RowIndex.Builder getRowIndex() {
+ return rowIndex;
+ }
+
+ protected ColumnStatisticsImpl getStripeStatistics() {
+ return stripeColStatistics;
+ }
+
+ protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
+ return rowIndexEntry;
+ }
+
+ IntegerWriter createIntegerWriter(PositionedOutputStream output,
+ boolean signed, boolean isDirectV2,
+ StreamFactory writer) {
+ if (isDirectV2) {
+ boolean alignedBitpacking = false;
+ if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
+ alignedBitpacking = true;
+ }
+ return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
+ } else {
+ return new RunLengthIntegerWriter(output, signed);
+ }
+ }
+
+ boolean isNewWriteFormat(StreamFactory writer) {
+ return writer.getVersion() != OrcFile.Version.V_0_11;
+ }
+
+ /**
+ * Handle the top level object write.
+ *
+ * This default method is used for all types except structs, which are the
+ * typical case. VectorizedRowBatch assumes the top level object is a
+ * struct, so we use the first column for all other types.
+ * @param batch the batch to write from
+ * @param offset the row to start on
+ * @param length the number of rows to write
+ * @throws IOException
+ */
+ void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException {
+ writeBatch(batch.cols[0], offset, length);
+ }
+
+ /**
+ * Write the values from the given vector from offset for length elements.
+ * @param vector the vector to write from
+ * @param offset the first value from the vector to write
+ * @param length the number of values from the vector to write
+ * @throws IOException
+ */
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ if (vector.noNulls) {
+ indexStatistics.increment(length);
+ if (isPresent != null) {
+ for (int i = 0; i < length; ++i) {
+ isPresent.write(1);
+ }
+ }
+ } else {
+ if (vector.isRepeating) {
+ boolean isNull = vector.isNull[0];
+ if (isPresent != null) {
+ for (int i = 0; i < length; ++i) {
+ isPresent.write(isNull ? 0 : 1);
+ }
+ }
+ if (isNull) {
+ foundNulls = true;
+ indexStatistics.setNull();
+ } else {
+ indexStatistics.increment(length);
+ }
+ } else {
+ // count the number of non-null values
+ int nonNullCount = 0;
+ for(int i = 0; i < length; ++i) {
+ boolean isNull = vector.isNull[i + offset];
+ if (!isNull) {
+ nonNullCount += 1;
+ }
+ if (isPresent != null) {
+ isPresent.write(isNull ? 0 : 1);
+ }
+ }
+ indexStatistics.increment(nonNullCount);
+ if (nonNullCount != length) {
+ foundNulls = true;
+ indexStatistics.setNull();
+ }
+ }
+ }
+ }
+
+ private void removeIsPresentPositions() {
+ for(int i=0; i < rowIndex.getEntryCount(); ++i) {
+ OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
+ List<Long> positions = entry.getPositionsList();
+ // bit streams use 3 positions if uncompressed, 4 if compressed
+ positions = positions.subList(isCompressed ? 4 : 3, positions.size());
+ entry.clearPositions();
+ entry.addAllPositions(positions);
+ }
+ }
+
+ /**
+ * Write the stripe out to the file.
+ * @param builder the stripe footer that contains the information about the
+ * layout of the stripe. The TreeWriter is required to update
+ * the footer with its information.
+ * @param requiredIndexEntries the number of index entries that are
+ * required. this is to check to make sure the
+ * row index is well formed.
+ * @throws IOException
+ */
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ if (isPresent != null) {
+ isPresent.flush();
+
+ // if no nulls are found in a stream, then suppress the stream
+ if(!foundNulls) {
+ isPresentOutStream.suppress();
+ // since isPresent bitstream is suppressed, update the index to
+ // remove the positions of the isPresent stream
+ if (rowIndexStream != null) {
+ removeIsPresentPositions();
+ }
+ }
+ }
+
+ // merge stripe-level column statistics to file statistics and write it to
+ // stripe statistics
+ OrcProto.StripeStatistics.Builder stripeStatsBuilder = OrcProto.StripeStatistics.newBuilder();
+ writeStripeStatistics(stripeStatsBuilder, this);
+ stripeStatsBuilders.add(stripeStatsBuilder);
+
+ // reset the flag for next stripe
+ foundNulls = false;
+
+ builder.addColumns(getEncoding());
+ if (streamFactory.hasWriterTimeZone()) {
+ builder.setWriterTimezone(TimeZone.getDefault().getID());
+ }
+ if (rowIndexStream != null) {
+ if (rowIndex.getEntryCount() != requiredIndexEntries) {
+ throw new IllegalArgumentException("Column has wrong number of " +
+ "index entries found: " + rowIndex.getEntryCount() + " expected: " +
+ requiredIndexEntries);
+ }
+ rowIndex.build().writeTo(rowIndexStream);
+ rowIndexStream.flush();
+ }
+ rowIndex.clear();
+ rowIndexEntry.clear();
+
+ // write the bloom filter to out stream
+ if (bloomFilterStream != null) {
+ bloomFilterIndex.build().writeTo(bloomFilterStream);
+ bloomFilterStream.flush();
+ bloomFilterIndex.clear();
+ bloomFilterEntry.clear();
+ }
+ }
+
+ private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
+ TreeWriter treeWriter) {
+ treeWriter.fileStatistics.merge(treeWriter.stripeColStatistics);
+ builder.addColStats(treeWriter.stripeColStatistics.serialize().build());
+ treeWriter.stripeColStatistics.reset();
+ for (TreeWriter child : treeWriter.getChildrenWriters()) {
+ writeStripeStatistics(builder, child);
+ }
+ }
+
+ TreeWriter[] getChildrenWriters() {
+ return childrenWriters;
+ }
+
+ /**
+ * Get the encoding for this column.
+ * @return the information about the encoding of this column
+ */
+ OrcProto.ColumnEncoding getEncoding() {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ /**
+ * Create a row index entry with the previous location and the current
+ * index statistics. Also merges the index statistics into the file
+ * statistics before they are cleared. Finally, it records the start of the
+ * next index and ensures all of the children columns also create an entry.
+ * @throws IOException
+ */
+ void createRowIndexEntry() throws IOException {
+ stripeColStatistics.merge(indexStatistics);
+ rowIndexEntry.setStatistics(indexStatistics.serialize());
+ indexStatistics.reset();
+ rowIndex.addEntry(rowIndexEntry);
+ rowIndexEntry.clear();
+ addBloomFilterEntry();
+ recordPosition(rowIndexPosition);
+ for(TreeWriter child: childrenWriters) {
+ child.createRowIndexEntry();
+ }
+ }
+
+ void addBloomFilterEntry() {
+ if (createBloomFilter) {
+ bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
+ bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet()));
+ bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
+ bloomFilter.reset();
+ bloomFilterEntry.clear();
+ }
+ }
+
+ /**
+ * Record the current position in each of this column's streams.
+ * @param recorder where should the locations be recorded
+ * @throws IOException
+ */
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ if (isPresent != null) {
+ isPresent.getPosition(recorder);
+ }
+ }
+
+ /**
+ * Estimate how much memory the writer is consuming excluding the streams.
+ * @return the number of bytes.
+ */
+ long estimateMemory() {
+ long result = 0;
+ for (TreeWriter child: childrenWriters) {
+ result += child.estimateMemory();
+ }
+ return result;
+ }
+ }
+
+ private static class BooleanTreeWriter extends TreeWriter {
+ private final BitFieldWriter writer;
+
+ BooleanTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ PositionedOutputStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.writer = new BitFieldWriter(out, 1);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int value = vec.vector[0] == 0 ? 0 : 1;
+ indexStatistics.updateBoolean(value != 0, length);
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int value = vec.vector[i + offset] == 0 ? 0 : 1;
+ writer.write(value);
+ indexStatistics.updateBoolean(value != 0, 1);
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class ByteTreeWriter extends TreeWriter {
+ private final RunLengthByteWriter writer;
+
+ ByteTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.writer = new RunLengthByteWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA));
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte value = (byte) vec.vector[0];
+ indexStatistics.updateInteger(value, length);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ byte value = (byte) vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateInteger(value, 1);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class IntegerTreeWriter extends TreeWriter {
+ private final IntegerWriter writer;
+ private boolean isDirectV2 = true;
+
+ IntegerTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ OutStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ long value = vec.vector[0];
+ indexStatistics.updateInteger(value, length);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ long value = vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateInteger(value, 1);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class FloatTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+ private final SerializationUtils utils;
+
+ FloatTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.utils = new SerializationUtils();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DoubleColumnVector vec = (DoubleColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ float value = (float) vec.vector[0];
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ for(int i=0; i < length; ++i) {
+ utils.writeFloat(stream, value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ float value = (float) vec.vector[i + offset];
+ utils.writeFloat(stream, value);
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ }
+ }
+ }
+ }
+
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ }
+ }
+
+ private static class DoubleTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+ private final SerializationUtils utils;
+
+ DoubleTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.utils = new SerializationUtils();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DoubleColumnVector vec = (DoubleColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ double value = vec.vector[0];
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ for(int i=0; i < length; ++i) {
+ utils.writeDouble(stream, value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ double value = vec.vector[i + offset];
+ utils.writeDouble(stream, value);
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ }
+ }
+
+ private static abstract class StringBaseTreeWriter extends TreeWriter {
+ private static final int INITIAL_DICTIONARY_SIZE = 4096;
+ private final OutStream stringOutput;
+ private final IntegerWriter lengthOutput;
+ private final IntegerWriter rowOutput;
+ protected final StringRedBlackTree dictionary =
+ new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
+ protected final DynamicIntArray rows = new DynamicIntArray();
+ protected final PositionedOutputStream directStreamOutput;
+ protected final IntegerWriter directLengthOutput;
+ private final List<OrcProto.RowIndexEntry> savedRowIndex =
+ new ArrayList<OrcProto.RowIndexEntry>();
+ private final boolean buildIndex;
+ private final List<Long> rowIndexValueCount = new ArrayList<Long>();
+ // If the number of keys in a dictionary is greater than this fraction of
+ //the total number of non-null rows, turn off dictionary encoding
+ private final double dictionaryKeySizeThreshold;
+ protected boolean useDictionaryEncoding = true;
+ private boolean isDirectV2 = true;
+ private boolean doneDictionaryCheck;
+ private final boolean strideDictionaryCheck;
+
+ StringBaseTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ stringOutput = writer.createStream(id,
+ OrcProto.Stream.Kind.DICTIONARY_DATA);
+ lengthOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ rowOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(0L);
+ buildIndex = writer.buildIndex();
+ directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+ directLengthOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ Configuration conf = writer.getConfiguration();
+ dictionaryKeySizeThreshold =
+ OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
+ strideDictionaryCheck =
+ OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
+ doneDictionaryCheck = false;
+ }
+
+ private boolean checkDictionaryEncoding() {
+ if (!doneDictionaryCheck) {
+ // Set the flag indicating whether or not to use dictionary encoding
+ // based on whether or not the fraction of distinct keys over number of
+ // non-null rows is less than the configured threshold
+ float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
+ useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
+ doneDictionaryCheck = true;
+ }
+ return useDictionaryEncoding;
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
+ // checking would not have happened. So do it again here.
+ checkDictionaryEncoding();
+
+ if (useDictionaryEncoding) {
+ flushDictionary();
+ } else {
+ // flushout any left over entries from dictionary
+ if (rows.size() > 0) {
+ flushDictionary();
+ }
+
+ // suppress the stream for every stripe if dictionary is disabled
+ stringOutput.suppress();
+ }
+
+ // we need to build the rowindex before calling super, since it
+ // writes it out.
+ super.writeStripe(builder, requiredIndexEntries);
+ stringOutput.flush();
+ lengthOutput.flush();
+ rowOutput.flush();
+ directStreamOutput.flush();
+ directLengthOutput.flush();
+ // reset all of the fields to be ready for the next stripe.
+ dictionary.clear();
+ savedRowIndex.clear();
+ rowIndexValueCount.clear();
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(0L);
+
+ if (!useDictionaryEncoding) {
+ // record the start positions of first index stride of next stripe i.e
+ // beginning of the direct streams when dictionary is disabled
+ recordDirectStreamPosition();
+ }
+ }
+
+ private void flushDictionary() throws IOException {
+ final int[] dumpOrder = new int[dictionary.size()];
+
+ if (useDictionaryEncoding) {
+ // Write the dictionary by traversing the red-black tree writing out
+ // the bytes and lengths; and creating the map from the original order
+ // to the final sorted order.
+
+ dictionary.visit(new StringRedBlackTree.Visitor() {
+ private int currentId = 0;
+ @Override
+ public void visit(StringRedBlackTree.VisitorContext context
+ ) throws IOException {
+ context.writeBytes(stringOutput);
+ lengthOutput.write(context.getLength());
+ dumpOrder[context.getOriginalPosition()] = currentId++;
+ }
+ });
+ } else {
+ // for direct encoding, we don't want the dictionary data stream
+ stringOutput.suppress();
+ }
+ int length = rows.size();
+ int rowIndexEntry = 0;
+ OrcProto.RowIndex.Builder rowIndex = getRowIndex();
+ Text text = new Text();
+ // write the values translated into the dump order.
+ for(int i = 0; i <= length; ++i) {
+ // now that we are writing out the row values, we can finalize the
+ // row index
+ if (buildIndex) {
+ while (i == rowIndexValueCount.get(rowIndexEntry) &&
+ rowIndexEntry < savedRowIndex.size()) {
+ OrcProto.RowIndexEntry.Builder base =
+ savedRowIndex.get(rowIndexEntry++).toBuilder();
+ if (useDictionaryEncoding) {
+ rowOutput.getPosition(new RowIndexPositionRecorder(base));
+ } else {
+ PositionRecorder posn = new RowIndexPositionRecorder(base);
+ directStreamOutput.getPosition(posn);
+ directLengthOutput.getPosition(posn);
+ }
+ rowIndex.addEntry(base.build());
+ }
+ }
+ if (i != length) {
+ if (useDictionaryEncoding) {
+ rowOutput.write(dumpOrder[rows.get(i)]);
+ } else {
+ dictionary.getText(text, rows.get(i));
+ directStreamOutput.write(text.getBytes(), 0, text.getLength());
+ directLengthOutput.write(text.getLength());
+ }
+ }
+ }
+ rows.clear();
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ // Returns the encoding used for the last call to writeStripe
+ if (useDictionaryEncoding) {
+ if(isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DICTIONARY_V2).
+ setDictionarySize(dictionary.size()).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DICTIONARY).
+ setDictionarySize(dictionary.size()).build();
+ } else {
+ if(isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+ }
+
+ /**
+ * This method doesn't call the super method, because unlike most of the
+ * other TreeWriters, this one can't record the position in the streams
+ * until the stripe is being flushed. Therefore it saves all of the entries
+ * and augments them with the final information as the stripe is written.
+ * @throws IOException
+ */
+ @Override
+ void createRowIndexEntry() throws IOException {
+ getStripeStatistics().merge(indexStatistics);
+ OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
+ rowIndexEntry.setStatistics(indexStatistics.serialize());
+ indexStatistics.reset();
+ OrcProto.RowIndexEntry base = rowIndexEntry.build();
+ savedRowIndex.add(base);
+ rowIndexEntry.clear();
+ addBloomFilterEntry();
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(Long.valueOf(rows.size()));
+ if (strideDictionaryCheck) {
+ checkDictionaryEncoding();
+ }
+ if (!useDictionaryEncoding) {
+ if (rows.size() > 0) {
+ flushDictionary();
+ // just record the start positions of next index stride
+ recordDirectStreamPosition();
+ } else {
+ // record the start positions of next index stride
+ recordDirectStreamPosition();
+ getRowIndex().addEntry(base);
+ }
+ }
+ }
+
+ private void recordDirectStreamPosition() throws IOException {
+ directStreamOutput.getPosition(rowIndexPosition);
+ directLengthOutput.getPosition(rowIndexPosition);
+ }
+
+ @Override
+ long estimateMemory() {
+ return rows.getSizeInBytes() + dictionary.getSizeInBytes();
+ }
+ }
+
+ private static class StringTreeWriter extends StringBaseTreeWriter {
+ StringTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
+ for(int i=0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ directStreamOutput.write(vec.vector[0], vec.start[0],
+ vec.length[0]);
+ directLengthOutput.write(vec.length[0]);
+ }
+ }
+ indexStatistics.updateString(vec.vector[0], vec.start[0],
+ vec.length[0], length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]));
+ } else {
+ directStreamOutput.write(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ directLengthOutput.write(vec.length[offset + i]);
+ }
+ indexStatistics.updateString(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i], 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Under the covers, char is written to ORC the same way as string.
+ */
+ private static class CharTreeWriter extends StringBaseTreeWriter {
+ private final int itemLength;
+ private final byte[] padding;
+
+ CharTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ itemLength = schema.getMaxLength();
+ padding = new byte[itemLength];
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte[] ptr;
+ int ptrOffset;
+ if (vec.length[0] >= itemLength) {
+ ptr = vec.vector[0];
+ ptrOffset = vec.start[0];
+ } else {
+ ptr = padding;
+ ptrOffset = 0;
+ System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
+ vec.length[0]);
+ Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
+ }
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(ptr, ptrOffset, itemLength);
+ for(int i=0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ directStreamOutput.write(ptr, ptrOffset, itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ }
+ indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ byte[] ptr;
+ int ptrOffset;
+ if (vec.length[offset + i] >= itemLength) {
+ ptr = vec.vector[offset + i];
+ ptrOffset = vec.start[offset + i];
+ } else {
+ // it is the wrong length, so copy it
+ ptr = padding;
+ ptrOffset = 0;
+ System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
+ ptr, 0, vec.length[offset + i]);
+ Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
+ }
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(ptr, ptrOffset, itemLength));
+ } else {
+ directStreamOutput.write(ptr, ptrOffset, itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Under the covers, varchar is written to ORC the same way as string.
+ */
+ private static class VarcharTreeWriter extends StringBaseTreeWriter {
+ private final int maxLength;
+
+ VarcharTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ maxLength = schema.getMaxLength();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int itemLength = Math.min(vec.length[0], maxLength);
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
+ for(int i=0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ directStreamOutput.write(vec.vector[0], vec.start[0],
+ itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ }
+ indexStatistics.updateString(vec.vector[0], vec.start[0],
+ itemLength, length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int itemLength = Math.min(vec.length[offset + i], maxLength);
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(vec.vector[offset + i],
+ vec.start[offset + i], itemLength));
+ } else {
+ directStreamOutput.write(vec.vector[offset + i],
+ vec.start[offset + i], itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ indexStatistics.updateString(vec.vector[offset + i],
+ vec.start[offset + i], itemLength, 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], itemLength);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private static class BinaryTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+ private final IntegerWriter length;
+ private boolean isDirectV2 = true;
+
+ BinaryTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.length = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ for(int i=0; i < length; ++i) {
+ stream.write(vec.vector[0], vec.start[0],
+ vec.length[0]);
+ this.length.write(vec.length[0]);
+ }
+ indexStatistics.updateBinary(vec.vector[0], vec.start[0],
+ vec.length[0], length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ stream.write(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ this.length.write(vec.length[offset + i]);
+ indexStatistics.updateBinary(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i], 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ }
+ }
+ }
+ }
+ }
+
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ length.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ length.getPosition(recorder);
+ }
+ }
+
+ public static final int MILLIS_PER_SECOND = 1000;
+ static final int NANOS_PER_SECOND = 1000000000;
+ static final int MILLIS_PER_NANO = 1000000;
+ public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
+
+ private static class TimestampTreeWriter extends TreeWriter {
+ private final IntegerWriter seconds;
+ private final IntegerWriter nanos;
+ private final boolean isDirectV2;
+ private final long base_timestamp;
+
+ TimestampTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.seconds = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
+ this.nanos = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ // for unit tests to set different time zones
+ this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
+ writer.useWriterTimeZone(true);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ long value = vec.vector[0];
+ long valueMillis = value / MILLIS_PER_NANO;
+ indexStatistics.updateTimestamp(valueMillis);
+ if (createBloomFilter) {
+ bloomFilter.addLong(valueMillis);
+ }
+ final long secs = value / NANOS_PER_SECOND - base_timestamp;
+ final long nano = formatNanos((int) (value % NANOS_PER_SECOND));
+ for(int i=0; i < length; ++i) {
+ seconds.write(secs);
+ nanos.write(nano);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ long value = vec.vector[i + offset];
+ long valueMillis = value / MILLIS_PER_NANO;
+ long valueSecs = value /NANOS_PER_SECOND - base_timestamp;
+ int valueNanos = (int) (value % NANOS_PER_SECOND);
+ if (valueNanos < 0) {
+ valueNanos += NANOS_PER_SECOND;
+ }
+ seconds.write(valueSecs);
+ nanos.write(formatNanos(valueNanos));
+ indexStatistics.updateTimestamp(valueMillis);
+ if (createBloomFilter) {
+ bloomFilter.addLong(valueMillis);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ seconds.flush();
+ nanos.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ private static long formatNanos(int nanos) {
+ if (nanos == 0) {
+ return 0;
+ } else if (nanos % 100 != 0) {
+ return ((long) nanos) << 3;
+ } else {
+ nanos /= 100;
+ int trailingZeros = 1;
+ while (nanos % 10 == 0 && trailingZeros < 7) {
+ nanos /= 10;
+ trailingZeros += 1;
+ }
+ return ((long) nanos) << 3 | trailingZeros;
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ seconds.getPosition(recorder);
+ nanos.getPosition(recorder);
+ }
+ }
+
+ private static class DateTreeWriter extends TreeWriter {
+ private final IntegerWriter writer;
+ private final boolean isDirectV2;
+
+ DateTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ OutStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int value = (int) vec.vector[0];
+ indexStatistics.updateDate(value);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int value = (int) vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateDate(value);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+ }
+
+ private static class DecimalTreeWriter extends TreeWriter {
+ private final PositionedOutputStream valueStream;
+ private final IntegerWriter scaleStream;
+ private final boolean isDirectV2;
+
+ DecimalTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+ this.scaleStream = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DecimalColumnVector vec = (DecimalColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ HiveDecimal value = vec.vector[0].getHiveDecimal();
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ bloomFilter.addString(value.toString());
+ }
+ for(int i=0; i < length; ++i) {
+ SerializationUtils.writeBigInteger(valueStream,
+ value.unscaledValue());
+ scaleStream.write(value.scale());
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ HiveDecimal value = vec.vector[i + offset].getHiveDecimal();
+ SerializationUtils.writeBigInteger(valueStream,
+ value.unscaledValue());
+ scaleStream.write(value.scale());
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ bloomFilter.addString(value.toString());
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ valueStream.flush();
+ scaleStream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ valueStream.getPosition(recorder);
+ scaleStream.getPosition(recorder);
+ }
+ }
+
+ private static class StructTreeWriter extends TreeWriter {
+ StructTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ List<TypeDescription> children = schema.getChildren();
+ childrenWriters = new TreeWriter[children.size()];
+ for(int i=0; i < childrenWriters.length; ++i) {
+ childrenWriters[i] = createTreeWriter(
+ children.get(i), writer,
+ true);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException {
+ // update the statistics for the root column
+ indexStatistics.increment(length);
+ // I'm assuming that the root column isn't nullable so that I don't need
+ // to update isPresent.
+ for(int i=0; i < childrenWriters.length; ++i) {
+ childrenWriters[i].writeBatch(batch.cols[i], offset, length);
+ }
+ }
+
+ private static void writeFields(StructColumnVector vector,
+ TreeWriter[] childrenWriters,
+ int offset, int length) throws IOException {
+ for(int field=0; field < childrenWriters.length; ++field) {
+ childrenWriters[field].writeBatch(vector.fields[field], offset, length);
+ }
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ StructColumnVector vec = (StructColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ writeFields(vec, childrenWriters, offset, length);
+ }
+ } else if (vector.noNulls) {
+ writeFields(vec, childrenWriters, offset, length);
+ } else {
+ // write the records in runs
+ int currentRun = 0;
+ boolean started = false;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ if (!started) {
+ started = true;
+ currentRun = i;
+ }
+ } else if (started) {
+ started = false;
+ writeFields(vec, childrenWriters, offset + currentRun,
+ i - currentRun);
+ }
+ }
+ if (started) {
+ writeFields(vec, childrenWriters, offset + currentRun,
+ length - currentRun);
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ private static class ListTreeWriter extends TreeWriter {
+ private final IntegerWriter lengths;
+ private final boolean isDirectV2;
+
+ ListTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ childrenWriters = new TreeWriter[1];
+ childrenWriters[0] =
+ createTreeWriter(schema.getChildren().get(0), writer, true);
+ lengths = createIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ ListColumnVector vec = (ListColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int childOffset = (int) vec.offsets[0];
+ int childLength = (int) vec.lengths[0];
+ for(int i=0; i < length; ++i) {
+ lengths.write(childLength);
+ childrenWriters[0].writeBatch(vec.child, childOffset, childLength);
+ }
+ if (createBloomFilter) {
+ bloomFilter.addLong(childLength);
+ }
+ }
+ } else {
+ // write the elements in runs
+ int currentOffset = 0;
+ int currentLength = 0;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ int nextLength = (int) vec.lengths[offset + i];
+ int nextOffset = (int) vec.offsets[offset + i];
+ lengths.write(nextLength);
+ if (currentLength == 0) {
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else if (currentOffset + currentLength != nextOffset) {
+ childrenWriters[0].writeBatch(vec.child, currentOffset,
+ currentLength);
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else {
+ currentLength += nextLength;
+ }
+ }
+ }
+ if (currentLength != 0) {
+ childrenWriters[0].writeBatch(vec.child, currentOffset,
+ currentLength);
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ lengths.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ lengths.getPosition(recorder);
+ }
+ }
+
+ private static class MapTreeWriter extends TreeWriter {
+ private final IntegerWriter lengths;
+ private final boolean isDirectV2;
+
+ MapTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ childrenWriters = new TreeWriter[2];
+ List<TypeDescription> children = schema.getChildren();
+ childrenWriters[0] =
+ createTreeWriter(children.get(0), writer, true);
+ childrenWriters[1] =
+ createTreeWriter(children.get(1), writer, true);
+ lengths = createIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ MapColumnVector vec = (MapColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int childOffset = (int) vec.offsets[0];
+ int childLength = (int) vec.lengths[0];
+ for(int i=0; i < length; ++i) {
+ lengths.write(childLength);
+ childrenWriters[0].writeBatch(vec.keys, childOffset, childLength);
+ childrenWriters[1].writeBatch(vec.values, childOffset, childLength);
+ }
+ if (createBloomFilter) {
+ bloomFilter.addLong(childLength);
+ }
+ }
+ } else {
+ // write the elements in runs
+ int currentOffset = 0;
+ int currentLength = 0;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ int nextLength = (int) vec.lengths[offset + i];
+ int nextOffset = (int) vec.offsets[offset + i];
+ lengths.write(nextLength);
+ if (currentLength == 0) {
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else if (currentOffset + currentLength != nextOffset) {
+ childrenWriters[0].writeBatch(vec.keys, currentOffset,
+ currentLength);
+ childrenWriters[1].writeBatch(vec.values, currentOffset,
+ currentLength);
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else {
+ currentLength += nextLength;
+ }
+ }
+ }
+ if (currentLength != 0) {
+ childrenWriters[0].writeBatch(vec.keys, currentOffset,
+ currentLength);
+ childrenWriters[1].writeBatch(vec.values, currentOffset,
+ currentLength);
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ lengths.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ lengths.getPosition(recorder);
+ }
+ }
+
+ private static class UnionTreeWriter extends TreeWriter {
+ private final RunLengthByteWriter tags;
+
+ UnionTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ List<TypeDescription> children = schema.getChildren();
+ childrenWriters = new TreeWriter[children.size()];
+ for(int i=0; i < childrenWriters.length; ++i) {
+ childrenWriters[i] =
+ createTreeWriter(children.get(i), writer, true);
+ }
+ tags =
+ new RunLengthByteWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.DATA));
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ UnionColumnVector vec = (UnionColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte tag = (byte) vec.tags[0];
+ for(int i=0; i < length; ++i) {
+ tags.write(tag);
+ }
+ if (createBloomFilter) {
+ bloomFilter.addLong(tag);
+ }
+ childrenWriters[tag].writeBatch(vec.fields[tag], offset, length);
+ }
+ } else {
+ // write the records in runs of the same tag
+ byte prevTag = 0;
+ int currentRun = 0;
+ boolean started = false;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ byte tag = (byte) vec.tags[offset + i];
+ tags.write(tag);
+ if (!started) {
+ started = true;
+ currentRun = i;
+ prevTag = tag;
+ } else if (tag != prevTag) {
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, i - currentRun);
+ currentRun = i;
+ prevTag = tag;
+ }
+ } else if (started) {
+ started = false;
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, i - currentRun);
+ }
+ }
+ if (started) {
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, length - currentRun);
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ tags.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ tags.getPosition(recorder);
+ }
+ }
+
+ private static TreeWriter createTreeWriter(TypeDescription schema,
+ StreamFactory streamFactory,
+ boolean nullable) throws IOException {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return new BooleanTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case BYTE:
+ return new ByteTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case SHORT:
+ case INT:
+ case LONG:
+ return new IntegerTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case FLOAT:
+ return new FloatTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case DOUBLE:
+ return new DoubleTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case STRING:
+ return new StringTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case CHAR:
+ return new CharTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case VARCHAR:
+ return new VarcharTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case BINARY:
+ return new BinaryTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case TIMESTAMP:
+ return new TimestampTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case DATE:
+ return new DateTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case DECIMAL:
+ return new DecimalTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case STRUCT:
+ return new StructTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case MAP:
+ return new MapTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case LIST:
+ return new ListTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case UNION:
+ return new UnionTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ default:
+ throw new IllegalArgumentException("Bad category: " +
+ schema.getCategory());
+ }
+ }
+
+ private static void writeTypes(OrcProto.Footer.Builder builder,
+ TypeDescription schema) {
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ List<TypeDescription> children = schema.getChildren();
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ type.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ type.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ type.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ type.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ type.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ type.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ type.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ type.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case CHAR:
+ type.setKind(OrcProto.Type.Kind.CHAR);
+ type.setMaximumLength(schema.getMaxLength());
+ break;
+ case VARCHAR:
+ type.setKind(OrcProto.Type.Kind.VARCHAR);
+ type.setMaximumLength(schema.getMaxLength());
+ break;
+ case BINARY:
+ type.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DATE:
+ type.setKind(OrcProto.Type.Kind.DATE);
+ break;
+ case DECIMAL:
+ type.setKind(OrcProto.Type.Kind.DECIMAL);
+ type.setPrecision(schema.getPrecision());
+ type.setScale(schema.getScale());
+ break;
+ case LIST:
+ type.setKind(OrcProto.Type.Kind.LIST);
+ type.addSubtypes(children.get(0).getId());
+ break;
+ case MAP:
+ type.setKind(OrcProto.Type.Kind.MAP);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ break;
+ case STRUCT:
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+ for(TypeD
<TRUNCATED>