You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/14 22:36:15 UTC
[2/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 993be71..8cfb402 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -18,52 +18,15 @@
package org.apache.hadoop.hive.ql.io.orc;
-import static com.google.common.base.Preconditions.checkArgument;
-
import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import java.util.TimeZone;
-import java.util.TreeMap;
+import java.util.Set;
-import org.apache.orc.BinaryColumnStatistics;
-import org.apache.orc.impl.BitFieldWriter;
-import org.apache.orc.impl.ColumnStatisticsImpl;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.impl.DynamicIntArray;
-import org.apache.orc.impl.IntegerWriter;
-import org.apache.orc.impl.MemoryManager;
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.impl.OutStream;
-import org.apache.orc.impl.PositionRecorder;
-import org.apache.orc.impl.PositionedOutputStream;
-import org.apache.orc.impl.RunLengthByteWriter;
-import org.apache.orc.impl.RunLengthIntegerWriter;
-import org.apache.orc.impl.RunLengthIntegerWriterV2;
-import org.apache.orc.impl.SerializationUtils;
-import org.apache.orc.impl.SnappyCodec;
-import org.apache.orc.impl.StreamName;
-import org.apache.orc.StringColumnStatistics;
-import org.apache.orc.impl.StringRedBlackTree;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.ZlibCodec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -74,10 +37,6 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
-import org.apache.orc.CompressionCodec.Modifier;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -98,17 +57,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.orc.OrcProto;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
/**
* An ORC file writer. The file is divided into stripes, which is the natural
@@ -128,3184 +78,240 @@ import com.google.protobuf.CodedOutputStream;
* thread as well.
*
*/
-public class WriterImpl implements Writer, MemoryManager.Callback {
-
- private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
- static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
-
- private static final int HDFS_BUFFER_SIZE = 256 * 1024;
- private static final int MIN_ROW_INDEX_STRIDE = 1000;
+public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer {
- // threshold above which buffer size will be automatically resized
- private static final int COLUMN_COUNT_THRESHOLD = 1000;
-
- private final FileSystem fs;
- private final Path path;
- private final long defaultStripeSize;
- private long adjustedStripeSize;
- private final int rowIndexStride;
- private final CompressionKind compress;
- private final CompressionCodec codec;
- private final boolean addBlockPadding;
- private final int bufferSize;
- private final long blockSize;
- private final double paddingTolerance;
- private final TypeDescription schema;
-
- // the streams that make up the current stripe
- private final Map<StreamName, BufferedStream> streams =
- new TreeMap<StreamName, BufferedStream>();
-
- private FSDataOutputStream rawWriter = null;
- // the compressed metadata information outStream
- private OutStream writer = null;
- // a protobuf outStream around streamFactory
- private CodedOutputStream protobufWriter = null;
- private long headerLength;
- private int columnCount;
- private long rowCount = 0;
- private long rowsInStripe = 0;
- private long rawDataSize = 0;
- private int rowsInIndex = 0;
- private int stripesAtLastFlush = -1;
- private final List<OrcProto.StripeInformation> stripes =
- new ArrayList<OrcProto.StripeInformation>();
- private final Map<String, ByteString> userMetadata =
- new TreeMap<String, ByteString>();
- private final StreamFactory streamFactory = new StreamFactory();
- private final TreeWriter treeWriter;
- private final boolean buildIndex;
- private final MemoryManager memoryManager;
- private final OrcFile.Version version;
- private final Configuration conf;
- private final OrcFile.WriterCallback callback;
- private final OrcFile.WriterContext callbackContext;
- private final OrcFile.EncodingStrategy encodingStrategy;
- private final OrcFile.CompressionStrategy compressionStrategy;
- private final boolean[] bloomFilterColumns;
- private final double bloomFilterFpp;
- private boolean writeTimeZone;
+ private final ObjectInspector inspector;
+ private final VectorizedRowBatch internalBatch;
+ private final StructField[] fields;
WriterImpl(FileSystem fs,
- Path path,
- Configuration conf,
- ObjectInspector inspector,
- TypeDescription schema,
- long stripeSize,
- CompressionKind compress,
- int bufferSize,
- int rowIndexStride,
- MemoryManager memoryManager,
- boolean addBlockPadding,
- OrcFile.Version version,
- OrcFile.WriterCallback callback,
- OrcFile.EncodingStrategy encodingStrategy,
- OrcFile.CompressionStrategy compressionStrategy,
- double paddingTolerance,
- long blockSizeValue,
- String bloomFilterColumnNames,
- double bloomFilterFpp) throws IOException {
- this.fs = fs;
- this.path = path;
- this.conf = conf;
- this.callback = callback;
- this.schema = schema;
- if (callback != null) {
- callbackContext = new OrcFile.WriterContext(){
-
- @Override
- public Writer getWriter() {
- return WriterImpl.this;
- }
- };
- } else {
- callbackContext = null;
- }
- this.adjustedStripeSize = stripeSize;
- this.defaultStripeSize = stripeSize;
- this.version = version;
- this.encodingStrategy = encodingStrategy;
- this.compressionStrategy = compressionStrategy;
- this.addBlockPadding = addBlockPadding;
- this.blockSize = blockSizeValue;
- this.paddingTolerance = paddingTolerance;
- this.compress = compress;
- this.rowIndexStride = rowIndexStride;
- this.memoryManager = memoryManager;
- buildIndex = rowIndexStride > 0;
- codec = createCodec(compress);
- int numColumns = schema.getMaximumId() + 1;
- this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
- numColumns, bufferSize);
- if (version == OrcFile.Version.V_0_11) {
- /* do not write bloom filters for ORC v11 */
- this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
- } else {
- this.bloomFilterColumns =
- OrcUtils.includeColumns(bloomFilterColumnNames, schema);
- }
- this.bloomFilterFpp = bloomFilterFpp;
- treeWriter = createTreeWriter(inspector, schema, streamFactory, false);
- if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
- throw new IllegalArgumentException("Row stride must be at least " +
- MIN_ROW_INDEX_STRIDE);
- }
-
- // ensure that we are able to handle callbacks before we register ourselves
- memoryManager.addWriter(path, stripeSize, this);
- }
-
- @VisibleForTesting
- static int getEstimatedBufferSize(long stripeSize, int numColumns, int bs) {
- // The worst case is that there are 2 big streams per a column and
- // we want to guarantee that each stream gets ~10 buffers.
- // This keeps buffers small enough that we don't get really small stripe
- // sizes.
- int estBufferSize = (int) (stripeSize / (20 * numColumns));
- estBufferSize = getClosestBufferSize(estBufferSize);
- if (estBufferSize > bs) {
- estBufferSize = bs;
- } else {
- LOG.info("WIDE TABLE - Number of columns: " + numColumns +
- " Chosen compression buffer size: " + estBufferSize);
- }
- return estBufferSize;
- }
-
- private static int getClosestBufferSize(int estBufferSize) {
- final int kb4 = 4 * 1024;
- final int kb8 = 8 * 1024;
- final int kb16 = 16 * 1024;
- final int kb32 = 32 * 1024;
- final int kb64 = 64 * 1024;
- final int kb128 = 128 * 1024;
- final int kb256 = 256 * 1024;
- if (estBufferSize <= kb4) {
- return kb4;
- } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
- return kb8;
- } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
- return kb16;
- } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
- return kb32;
- } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
- return kb64;
- } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
- return kb128;
+ Path path,
+ OrcFile.WriterOptions opts) throws IOException {
+ super(fs, path, opts);
+ this.inspector = opts.getInspector();
+ internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+ if (inspector instanceof StructObjectInspector) {
+ List<? extends StructField> fieldList =
+ ((StructObjectInspector) inspector).getAllStructFieldRefs();
+ fields = new StructField[fieldList.size()];
+ fieldList.toArray(fields);
} else {
- return kb256;
- }
- }
-
- public static CompressionCodec createCodec(CompressionKind kind) {
- switch (kind) {
- case NONE:
- return null;
- case ZLIB:
- return new ZlibCodec();
- case SNAPPY:
- return new SnappyCodec();
- case LZO:
- try {
- Class<? extends CompressionCodec> lzo =
- (Class<? extends CompressionCodec>)
- JavaUtils.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
- return lzo.newInstance();
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("LZO is not available.", e);
- } catch (InstantiationException e) {
- throw new IllegalArgumentException("Problem initializing LZO", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException("Insufficient access to LZO", e);
- }
- default:
- throw new IllegalArgumentException("Unknown compression codec: " +
- kind);
- }
- }
-
- @Override
- public boolean checkMemory(double newScale) throws IOException {
- long limit = (long) Math.round(adjustedStripeSize * newScale);
- long size = estimateStripeSize();
- if (LOG.isDebugEnabled()) {
- LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
- limit);
- }
- if (size > limit) {
- flushStripe();
- return true;
- }
- return false;
- }
-
- /**
- * This class is used to hold the contents of streams as they are buffered.
- * The TreeWriters write to the outStream and the codec compresses the
- * data as buffers fill up and stores them in the output list. When the
- * stripe is being written, the whole stream is written to the file.
- */
- private class BufferedStream implements OutStream.OutputReceiver {
- private final OutStream outStream;
- private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
-
- BufferedStream(String name, int bufferSize,
- CompressionCodec codec) throws IOException {
- outStream = new OutStream(name, bufferSize, codec, this);
- }
-
- /**
- * Receive a buffer from the compression codec.
- * @param buffer the buffer to save
- * @throws IOException
- */
- @Override
- public void output(ByteBuffer buffer) {
- output.add(buffer);
- }
-
- /**
- * Get the number of bytes in buffers that are allocated to this stream.
- * @return number of bytes in buffers
- */
- public long getBufferSize() {
- long result = 0;
- for(ByteBuffer buf: output) {
- result += buf.capacity();
- }
- return outStream.getBufferSize() + result;
- }
-
- /**
- * Flush the stream to the codec.
- * @throws IOException
- */
- public void flush() throws IOException {
- outStream.flush();
- }
-
- /**
- * Clear all of the buffers.
- * @throws IOException
- */
- public void clear() throws IOException {
- outStream.clear();
- output.clear();
- }
-
- /**
- * Check the state of suppress flag in output stream
- * @return value of suppress flag
- */
- public boolean isSuppressed() {
- return outStream.isSuppressed();
- }
-
- /**
- * Get the number of bytes that will be written to the output. Assumes
- * the stream has already been flushed.
- * @return the number of bytes
- */
- public long getOutputSize() {
- long result = 0;
- for(ByteBuffer buffer: output) {
- result += buffer.remaining();
- }
- return result;
- }
-
- /**
- * Write the saved compressed buffers to the OutputStream.
- * @param out the stream to write to
- * @throws IOException
- */
- void spillTo(OutputStream out) throws IOException {
- for(ByteBuffer buffer: output) {
- out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
- }
- }
-
- @Override
- public String toString() {
- return outStream.toString();
- }
- }
-
- /**
- * An output receiver that writes the ByteBuffers to the output stream
- * as they are received.
- */
- private class DirectStream implements OutStream.OutputReceiver {
- private final FSDataOutputStream output;
-
- DirectStream(FSDataOutputStream output) {
- this.output = output;
- }
-
- @Override
- public void output(ByteBuffer buffer) throws IOException {
- output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
- }
- }
-
- private static class RowIndexPositionRecorder implements PositionRecorder {
- private final OrcProto.RowIndexEntry.Builder builder;
-
- RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
- this.builder = builder;
- }
-
- @Override
- public void addPosition(long position) {
- builder.addPositions(position);
+ fields = null;
}
}
- /**
- * Interface from the Writer to the TreeWriters. This limits the visibility
- * that the TreeWriters have into the Writer.
- */
- private class StreamFactory {
- /**
- * Create a stream to store part of a column.
- * @param column the column id for the stream
- * @param kind the kind of stream
- * @return The output outStream that the section needs to be written to.
- * @throws IOException
- */
- public OutStream createStream(int column,
- OrcProto.Stream.Kind kind
- ) throws IOException {
- final StreamName name = new StreamName(column, kind);
- final EnumSet<CompressionCodec.Modifier> modifiers;
-
- switch (kind) {
- case BLOOM_FILTER:
- case DATA:
- case DICTIONARY_DATA:
- if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
- modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
- } else {
- modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
- }
- break;
- case LENGTH:
- case DICTIONARY_COUNT:
- case PRESENT:
- case ROW_INDEX:
- case SECONDARY:
- // easily compressed using the fastest modes
- modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
- break;
- default:
- LOG.warn("Missing ORC compression modifiers for " + kind);
- modifiers = null;
- break;
- }
-
- BufferedStream result = streams.get(name);
- if (result == null) {
- result = new BufferedStream(name.toString(), bufferSize,
- codec == null ? codec : codec.modify(modifiers));
- streams.put(name, result);
- }
- return result.outStream;
- }
-
- /**
- * Get the next column id.
- * @return a number from 0 to the number of columns - 1
- */
- public int getNextColumnId() {
- return columnCount++;
- }
-
- /**
- * Get the stride rate of the row index.
- */
- public int getRowIndexStride() {
- return rowIndexStride;
- }
-
- /**
- * Should be building the row index.
- * @return true if we are building the index
- */
- public boolean buildIndex() {
- return buildIndex;
- }
-
- /**
- * Is the ORC file compressed?
- * @return are the streams compressed
- */
- public boolean isCompressed() {
- return codec != null;
- }
-
- /**
- * Get the encoding strategy to use.
- * @return encoding strategy
- */
- public OrcFile.EncodingStrategy getEncodingStrategy() {
- return encodingStrategy;
- }
-
- /**
- * Get the compression strategy to use.
- * @return compression strategy
- */
- public OrcFile.CompressionStrategy getCompressionStrategy() {
- return compressionStrategy;
- }
-
- /**
- * Get the bloom filter columns
- * @return bloom filter columns
- */
- public boolean[] getBloomFilterColumns() {
- return bloomFilterColumns;
- }
-
- /**
- * Get bloom filter false positive percentage.
- * @return fpp
- */
- public double getBloomFilterFPP() {
- return bloomFilterFpp;
- }
-
- /**
- * Get the writer's configuration.
- * @return configuration
- */
- public Configuration getConfiguration() {
- return conf;
- }
-
- /**
- * Get the version of the file to write.
- */
- public OrcFile.Version getVersion() {
- return version;
- }
-
- public void useWriterTimeZone(boolean val) {
- writeTimeZone = val;
- }
-
- public boolean hasWriterTimeZone() {
- return writeTimeZone;
- }
- }
+ private static final long NANOS_PER_MILLI = 1000000;
/**
- * The parent class of all of the writers for each column. Each column
- * is written by an instance of this class. The compound types (struct,
- * list, map, and union) have children tree writers that write the children
- * types.
+ * Set the value for a given column value within a batch.
+ * @param rowId the row to set
+ * @param column the column to set
+ * @param inspector the object inspector to interpret the obj
+ * @param obj the value to use
*/
- private abstract static class TreeWriter {
- protected final int id;
- protected final ObjectInspector inspector;
- protected final BitFieldWriter isPresent;
- private final boolean isCompressed;
- protected final ColumnStatisticsImpl indexStatistics;
- protected final ColumnStatisticsImpl stripeColStatistics;
- private final ColumnStatisticsImpl fileStatistics;
- protected TreeWriter[] childrenWriters;
- protected final RowIndexPositionRecorder rowIndexPosition;
- private final OrcProto.RowIndex.Builder rowIndex;
- private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
- private final PositionedOutputStream rowIndexStream;
- private final PositionedOutputStream bloomFilterStream;
- protected final BloomFilterIO bloomFilter;
- protected final boolean createBloomFilter;
- private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
- private final OrcProto.BloomFilter.Builder bloomFilterEntry;
- private boolean foundNulls;
- private OutStream isPresentOutStream;
- private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders;
- private final StreamFactory streamFactory;
-
- /**
- * Create a tree writer.
- * @param columnId the column id of the column to write
- * @param inspector the object inspector to use
- * @param schema the row schema
- * @param streamFactory limited access to the Writer's data.
- * @param nullable can the value be null?
- * @throws IOException
- */
- TreeWriter(int columnId, ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory streamFactory,
- boolean nullable) throws IOException {
- this.streamFactory = streamFactory;
- this.isCompressed = streamFactory.isCompressed();
- this.id = columnId;
- this.inspector = inspector;
- if (nullable) {
- isPresentOutStream = streamFactory.createStream(id,
- OrcProto.Stream.Kind.PRESENT);
- isPresent = new BitFieldWriter(isPresentOutStream, 1);
- } else {
- isPresent = null;
- }
- this.foundNulls = false;
- createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
- indexStatistics = ColumnStatisticsImpl.create(schema);
- stripeColStatistics = ColumnStatisticsImpl.create(schema);
- fileStatistics = ColumnStatisticsImpl.create(schema);
- childrenWriters = new TreeWriter[0];
- rowIndex = OrcProto.RowIndex.newBuilder();
- rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
- rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
- stripeStatsBuilders = Lists.newArrayList();
- if (streamFactory.buildIndex()) {
- rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
- } else {
- rowIndexStream = null;
- }
- if (createBloomFilter) {
- bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
- bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
- bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
- bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
- streamFactory.getBloomFilterFPP());
- } else {
- bloomFilterEntry = null;
- bloomFilterIndex = null;
- bloomFilterStream = null;
- bloomFilter = null;
- }
- }
-
- protected OrcProto.RowIndex.Builder getRowIndex() {
- return rowIndex;
- }
-
- protected ColumnStatisticsImpl getStripeStatistics() {
- return stripeColStatistics;
- }
-
- protected ColumnStatisticsImpl getFileStatistics() {
- return fileStatistics;
- }
-
- protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
- return rowIndexEntry;
- }
-
- IntegerWriter createIntegerWriter(PositionedOutputStream output,
- boolean signed, boolean isDirectV2,
- StreamFactory writer) {
- if (isDirectV2) {
- boolean alignedBitpacking = false;
- if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
- alignedBitpacking = true;
- }
- return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
- } else {
- return new RunLengthIntegerWriter(output, signed);
- }
- }
-
- boolean isNewWriteFormat(StreamFactory writer) {
- return writer.getVersion() != OrcFile.Version.V_0_11;
- }
-
- /**
- * Add a new value to the column.
- * @param obj the object to write
- * @throws IOException
- */
- void write(Object obj) throws IOException {
- if (obj != null) {
- indexStatistics.increment();
- } else {
- indexStatistics.setNull();
- }
- if (isPresent != null) {
- isPresent.write(obj == null ? 0 : 1);
- if(obj == null) {
- foundNulls = true;
- }
- }
- }
-
- /**
- * Handle the top level object write.
- *
- * This default method is used for all types except structs, which are the
- * typical case. VectorizedRowBatch assumes the top level object is a
- * struct, so we use the first column for all other types.
- * @param batch the batch to write from
- * @param offset the row to start on
- * @param length the number of rows to write
- * @throws IOException
- */
- void writeRootBatch(VectorizedRowBatch batch, int offset,
- int length) throws IOException {
- writeBatch(batch.cols[0], offset, length);
- }
-
- /**
- * Write the values from the given vector from offset for length elements.
- * @param vector the vector to write from
- * @param offset the first value from the vector to write
- * @param length the number of values from the vector to write
- * @throws IOException
- */
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- if (vector.noNulls) {
- indexStatistics.increment(length);
- if (isPresent != null) {
- for (int i = 0; i < length; ++i) {
- isPresent.write(1);
- }
- }
- } else {
- if (vector.isRepeating) {
- boolean isNull = vector.isNull[0];
- if (isPresent != null) {
- for (int i = 0; i < length; ++i) {
- isPresent.write(isNull ? 0 : 1);
+ static void setColumn(int rowId, ColumnVector column,
+ ObjectInspector inspector, Object obj) {
+ if (obj == null) {
+ column.noNulls = false;
+ column.isNull[rowId] = true;
+ } else {
+ switch (inspector.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveObjectInspector) inspector)
+ .getPrimitiveCategory()) {
+ case BOOLEAN: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] =
+ ((BooleanObjectInspector) inspector).get(obj) ? 1 : 0;
+ break;
}
- }
- if (isNull) {
- foundNulls = true;
- indexStatistics.setNull();
- } else {
- indexStatistics.increment(length);
- }
- } else {
- // count the number of non-null values
- int nonNullCount = 0;
- for(int i = 0; i < length; ++i) {
- boolean isNull = vector.isNull[i + offset];
- if (!isNull) {
- nonNullCount += 1;
+ case BYTE: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = ((ByteObjectInspector) inspector).get(obj);
+ break;
}
- if (isPresent != null) {
- isPresent.write(isNull ? 0 : 1);
+ case SHORT: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] =
+ ((ShortObjectInspector) inspector).get(obj);
+ break;
+ }
+ case INT: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = ((IntObjectInspector) inspector).get(obj);
+ break;
+ }
+ case LONG: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = ((LongObjectInspector) inspector).get(obj);
+ break;
+ }
+ case FLOAT: {
+ DoubleColumnVector vector = (DoubleColumnVector) column;
+ vector.vector[rowId] =
+ ((FloatObjectInspector) inspector).get(obj);
+ break;
+ }
+ case DOUBLE: {
+ DoubleColumnVector vector = (DoubleColumnVector) column;
+ vector.vector[rowId] =
+ ((DoubleObjectInspector) inspector).get(obj);
+ break;
+ }
+ case BINARY: {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ BytesWritable blob = ((BinaryObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj);
+ vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+ break;
+ }
+ case STRING: {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ Text blob = ((StringObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj);
+ vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+ break;
+ }
+ case VARCHAR: {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ Text blob = ((HiveVarcharObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj).getTextValue();
+ vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+ break;
+ }
+ case CHAR: {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ Text blob = ((HiveCharObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj).getTextValue();
+ vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+ break;
+ }
+ case TIMESTAMP: {
+ LongColumnVector vector = (LongColumnVector) column;
+ Timestamp ts = ((TimestampObjectInspector) inspector)
+ .getPrimitiveJavaObject(obj);
+ vector.vector[rowId] = ts.getTime() * NANOS_PER_MILLI +
+ (ts.getNanos() % NANOS_PER_MILLI);
+ break;
+ }
+ case DATE: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = ((DateObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj).getDays();
+ break;
+ }
+ case DECIMAL: {
+ DecimalColumnVector vector = (DecimalColumnVector) column;
+ vector.set(rowId, ((HiveDecimalObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj));
+ break;
}
}
- indexStatistics.increment(nonNullCount);
- if (nonNullCount != length) {
- foundNulls = true;
- indexStatistics.setNull();
- }
- }
- }
- }
-
- private void removeIsPresentPositions() {
- for(int i=0; i < rowIndex.getEntryCount(); ++i) {
- OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
- List<Long> positions = entry.getPositionsList();
- // bit streams use 3 positions if uncompressed, 4 if compressed
- positions = positions.subList(isCompressed ? 4 : 3, positions.size());
- entry.clearPositions();
- entry.addAllPositions(positions);
- }
- }
-
- /**
- * Write the stripe out to the file.
- * @param builder the stripe footer that contains the information about the
- * layout of the stripe. The TreeWriter is required to update
- * the footer with its information.
- * @param requiredIndexEntries the number of index entries that are
- * required. this is to check to make sure the
- * row index is well formed.
- * @throws IOException
- */
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- if (isPresent != null) {
- isPresent.flush();
-
- // if no nulls are found in a stream, then suppress the stream
- if(!foundNulls) {
- isPresentOutStream.suppress();
- // since isPresent bitstream is suppressed, update the index to
- // remove the positions of the isPresent stream
- if (rowIndexStream != null) {
- removeIsPresentPositions();
+ break;
+ case STRUCT: {
+ StructColumnVector vector = (StructColumnVector) column;
+ StructObjectInspector oi = (StructObjectInspector) inspector;
+ List<? extends StructField> fields = oi.getAllStructFieldRefs();
+ for (int c = 0; c < vector.fields.length; ++c) {
+ StructField field = fields.get(c);
+ setColumn(rowId, vector.fields[c], field.getFieldObjectInspector(),
+ oi.getStructFieldData(obj, field));
}
+ break;
}
- }
-
- // merge stripe-level column statistics to file statistics and write it to
- // stripe statistics
- OrcProto.StripeStatistics.Builder stripeStatsBuilder = OrcProto.StripeStatistics.newBuilder();
- writeStripeStatistics(stripeStatsBuilder, this);
- stripeStatsBuilders.add(stripeStatsBuilder);
-
- // reset the flag for next stripe
- foundNulls = false;
-
- builder.addColumns(getEncoding());
- if (streamFactory.hasWriterTimeZone()) {
- builder.setWriterTimezone(TimeZone.getDefault().getID());
- }
- if (rowIndexStream != null) {
- if (rowIndex.getEntryCount() != requiredIndexEntries) {
- throw new IllegalArgumentException("Column has wrong number of " +
- "index entries found: " + rowIndex.getEntryCount() + " expected: " +
- requiredIndexEntries);
+ case UNION: {
+ UnionColumnVector vector = (UnionColumnVector) column;
+ UnionObjectInspector oi = (UnionObjectInspector) inspector;
+ int tag = oi.getTag(obj);
+ vector.tags[rowId] = tag;
+ setColumn(rowId, vector.fields[tag],
+ oi.getObjectInspectors().get(tag), oi.getField(obj));
+ break;
}
- rowIndex.build().writeTo(rowIndexStream);
- rowIndexStream.flush();
- }
- rowIndex.clear();
- rowIndexEntry.clear();
-
- // write the bloom filter to out stream
- if (bloomFilterStream != null) {
- bloomFilterIndex.build().writeTo(bloomFilterStream);
- bloomFilterStream.flush();
- bloomFilterIndex.clear();
- bloomFilterEntry.clear();
- }
- }
-
- private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
- TreeWriter treeWriter) {
- treeWriter.fileStatistics.merge(treeWriter.stripeColStatistics);
- builder.addColStats(treeWriter.stripeColStatistics.serialize().build());
- treeWriter.stripeColStatistics.reset();
- for (TreeWriter child : treeWriter.getChildrenWriters()) {
- writeStripeStatistics(builder, child);
- }
- }
-
- TreeWriter[] getChildrenWriters() {
- return childrenWriters;
- }
-
- /**
- * Get the encoding for this column.
- * @return the information about the encoding of this column
- */
- OrcProto.ColumnEncoding getEncoding() {
- return OrcProto.ColumnEncoding.newBuilder().setKind(
- OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
-
- /**
- * Create a row index entry with the previous location and the current
- * index statistics. Also merges the index statistics into the file
- * statistics before they are cleared. Finally, it records the start of the
- * next index and ensures all of the children columns also create an entry.
- * @throws IOException
- */
- void createRowIndexEntry() throws IOException {
- stripeColStatistics.merge(indexStatistics);
- rowIndexEntry.setStatistics(indexStatistics.serialize());
- indexStatistics.reset();
- rowIndex.addEntry(rowIndexEntry);
- rowIndexEntry.clear();
- addBloomFilterEntry();
- recordPosition(rowIndexPosition);
- for(TreeWriter child: childrenWriters) {
- child.createRowIndexEntry();
- }
- }
-
- void addBloomFilterEntry() {
- if (createBloomFilter) {
- bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
- bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet()));
- bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
- bloomFilter.reset();
- bloomFilterEntry.clear();
- }
- }
-
- /**
- * Record the current position in each of this column's streams.
- * @param recorder where should the locations be recorded
- * @throws IOException
- */
- void recordPosition(PositionRecorder recorder) throws IOException {
- if (isPresent != null) {
- isPresent.getPosition(recorder);
- }
- }
-
- /**
- * Estimate how much memory the writer is consuming excluding the streams.
- * @return the number of bytes.
- */
- long estimateMemory() {
- long result = 0;
- for (TreeWriter child: childrenWriters) {
- result += child.estimateMemory();
- }
- return result;
- }
- }
-
- private static class BooleanTreeWriter extends TreeWriter {
- private final BitFieldWriter writer;
-
- BooleanTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- PositionedOutputStream out = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.writer = new BitFieldWriter(out, 1);
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- boolean val = ((BooleanObjectInspector) inspector).get(obj);
- indexStatistics.updateBoolean(val, 1);
- writer.write(val ? 1 : 0);
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- int value = vec.vector[0] == 0 ? 0 : 1;
- indexStatistics.updateBoolean(value != 0, length);
- for(int i=0; i < length; ++i) {
- writer.write(value);
+ case LIST: {
+ ListColumnVector vector = (ListColumnVector) column;
+ ListObjectInspector oi = (ListObjectInspector) inspector;
+ int offset = vector.childCount;
+ int length = oi.getListLength(obj);
+ vector.offsets[rowId] = offset;
+ vector.lengths[rowId] = length;
+ vector.child.ensureSize(offset + length, true);
+ vector.childCount += length;
+ for (int c = 0; c < length; ++c) {
+ setColumn(offset + c, vector.child,
+ oi.getListElementObjectInspector(),
+ oi.getListElement(obj, c));
}
+ break;
}
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- int value = vec.vector[i + offset] == 0 ? 0 : 1;
- writer.write(value);
- indexStatistics.updateBoolean(value != 0, 1);
+ case MAP: {
+ MapColumnVector vector = (MapColumnVector) column;
+ MapObjectInspector oi = (MapObjectInspector) inspector;
+ int offset = vector.childCount;
+ Set map = oi.getMap(obj).entrySet();
+ int length = map.size();
+ vector.offsets[rowId] = offset;
+ vector.lengths[rowId] = length;
+ vector.keys.ensureSize(offset + length, true);
+ vector.values.ensureSize(offset + length, true);
+ vector.childCount += length;
+ for (Object item: map) {
+ Map.Entry pair = (Map.Entry) item;
+ setColumn(offset, vector.keys, oi.getMapKeyObjectInspector(),
+ pair.getKey());
+ setColumn(offset, vector.values, oi.getMapValueObjectInspector(),
+ pair.getValue());
+ offset += 1;
}
+ break;
}
+ default:
+ throw new IllegalArgumentException("Unknown ObjectInspector kind " +
+ inspector.getCategory());
}
}
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- writer.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
- }
}
- private static class ByteTreeWriter extends TreeWriter {
- private final RunLengthByteWriter writer;
-
- ByteTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.writer = new RunLengthByteWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA));
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- byte val = ((ByteObjectInspector) inspector).get(obj);
- indexStatistics.updateInteger(val, 1);
- if (createBloomFilter) {
- bloomFilter.addLong(val);
- }
- writer.write(val);
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- byte value = (byte) vec.vector[0];
- indexStatistics.updateInteger(value, length);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- for(int i=0; i < length; ++i) {
- writer.write(value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- byte value = (byte) vec.vector[i + offset];
- writer.write(value);
- indexStatistics.updateInteger(value, 1);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- }
- }
- }
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- writer.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
+ void flushInternalBatch() throws IOException {
+ if (internalBatch.size != 0) {
+ super.addRowBatch(internalBatch);
+ internalBatch.reset();
}
}
- private static class IntegerTreeWriter extends TreeWriter {
- private final IntegerWriter writer;
- private final ShortObjectInspector shortInspector;
- private final IntObjectInspector intInspector;
- private final LongObjectInspector longInspector;
- private boolean isDirectV2 = true;
-
- IntegerTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- OutStream out = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.writer = createIntegerWriter(out, true, isDirectV2, writer);
- if (inspector instanceof IntObjectInspector) {
- intInspector = (IntObjectInspector) inspector;
- shortInspector = null;
- longInspector = null;
- } else {
- intInspector = null;
- if (inspector instanceof LongObjectInspector) {
- longInspector = (LongObjectInspector) inspector;
- shortInspector = null;
- } else {
- shortInspector = (ShortObjectInspector) inspector;
- longInspector = null;
- }
- }
- recordPosition(rowIndexPosition);
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- if (isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- long val;
- if (intInspector != null) {
- val = intInspector.get(obj);
- } else if (longInspector != null) {
- val = longInspector.get(obj);
- } else {
- val = shortInspector.get(obj);
- }
- indexStatistics.updateInteger(val, 1);
- if (createBloomFilter) {
- // integers are converted to longs in column statistics and during SARG evaluation
- bloomFilter.addLong(val);
- }
- writer.write(val);
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- long value = vec.vector[0];
- indexStatistics.updateInteger(value, length);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- for(int i=0; i < length; ++i) {
- writer.write(value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- long value = vec.vector[i + offset];
- writer.write(value);
- indexStatistics.updateInteger(value, 1);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- }
- }
+ @Override
+ public void addRow(Object row) throws IOException {
+ int rowId = internalBatch.size++;
+ if (fields != null) {
+ StructObjectInspector soi = (StructObjectInspector) inspector;
+ for(int i=0; i < fields.length; ++i) {
+ setColumn(rowId, internalBatch.cols[i],
+ fields[i].getFieldObjectInspector(),
+ soi.getStructFieldData(row, fields[i]));
}
+ } else {
+ setColumn(rowId, internalBatch.cols[0], inspector, row);
}
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- writer.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
+ if (internalBatch.size == internalBatch.getMaxSize()) {
+ flushInternalBatch();
}
}
- private static class FloatTreeWriter extends TreeWriter {
- private final PositionedOutputStream stream;
- private final SerializationUtils utils;
+ @Override
+ public long writeIntermediateFooter() throws IOException {
+ flushInternalBatch();
+ return super.writeIntermediateFooter();
+ }
- FloatTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.stream = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.utils = new SerializationUtils();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- float val = ((FloatObjectInspector) inspector).get(obj);
- indexStatistics.updateDouble(val);
- if (createBloomFilter) {
- // floats are converted to doubles in column statistics and during SARG evaluation
- bloomFilter.addDouble(val);
- }
- utils.writeFloat(stream, val);
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- DoubleColumnVector vec = (DoubleColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- float value = (float) vec.vector[0];
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- bloomFilter.addDouble(value);
- }
- for(int i=0; i < length; ++i) {
- utils.writeFloat(stream, value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- float value = (float) vec.vector[i + offset];
- utils.writeFloat(stream, value);
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- bloomFilter.addDouble(value);
- }
- }
- }
- }
- }
-
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- stream.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- stream.getPosition(recorder);
- }
- }
-
- private static class DoubleTreeWriter extends TreeWriter {
- private final PositionedOutputStream stream;
- private final SerializationUtils utils;
-
- DoubleTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.stream = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.utils = new SerializationUtils();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- double val = ((DoubleObjectInspector) inspector).get(obj);
- indexStatistics.updateDouble(val);
- if (createBloomFilter) {
- bloomFilter.addDouble(val);
- }
- utils.writeDouble(stream, val);
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- DoubleColumnVector vec = (DoubleColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- double value = vec.vector[0];
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- bloomFilter.addDouble(value);
- }
- for(int i=0; i < length; ++i) {
- utils.writeDouble(stream, value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- double value = vec.vector[i + offset];
- utils.writeDouble(stream, value);
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- bloomFilter.addDouble(value);
- }
- }
- }
- }
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- stream.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- stream.getPosition(recorder);
- }
- }
-
- private static abstract class StringBaseTreeWriter extends TreeWriter {
- private static final int INITIAL_DICTIONARY_SIZE = 4096;
- private final OutStream stringOutput;
- private final IntegerWriter lengthOutput;
- private final IntegerWriter rowOutput;
- protected final StringRedBlackTree dictionary =
- new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
- protected final DynamicIntArray rows = new DynamicIntArray();
- protected final PositionedOutputStream directStreamOutput;
- protected final IntegerWriter directLengthOutput;
- private final List<OrcProto.RowIndexEntry> savedRowIndex =
- new ArrayList<OrcProto.RowIndexEntry>();
- private final boolean buildIndex;
- private final List<Long> rowIndexValueCount = new ArrayList<Long>();
- // If the number of keys in a dictionary is greater than this fraction of
- //the total number of non-null rows, turn off dictionary encoding
- private final double dictionaryKeySizeThreshold;
- protected boolean useDictionaryEncoding = true;
- private boolean isDirectV2 = true;
- private boolean doneDictionaryCheck;
- private final boolean strideDictionaryCheck;
-
- StringBaseTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.isDirectV2 = isNewWriteFormat(writer);
- stringOutput = writer.createStream(id,
- OrcProto.Stream.Kind.DICTIONARY_DATA);
- lengthOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- rowOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
- recordPosition(rowIndexPosition);
- rowIndexValueCount.add(0L);
- buildIndex = writer.buildIndex();
- directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- directLengthOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- Configuration conf = writer.getConfiguration();
- dictionaryKeySizeThreshold =
- OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
- strideDictionaryCheck =
- OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
- doneDictionaryCheck = false;
- }
-
- /**
- * Method to retrieve text values from the value object, which can be overridden
- * by subclasses.
- * @param obj value
- * @return Text text value from obj
- */
- Text getTextValue(Object obj) {
- return ((StringObjectInspector) inspector).getPrimitiveWritableObject(obj);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- Text val = getTextValue(obj);
- if (useDictionaryEncoding) {
- rows.add(dictionary.add(val));
- } else {
- // write data and length
- directStreamOutput.write(val.getBytes(), 0, val.getLength());
- directLengthOutput.write(val.getLength());
- }
- indexStatistics.updateString(val);
- if (createBloomFilter) {
- bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
- }
- }
- }
-
- private boolean checkDictionaryEncoding() {
- if (!doneDictionaryCheck) {
- // Set the flag indicating whether or not to use dictionary encoding
- // based on whether or not the fraction of distinct keys over number of
- // non-null rows is less than the configured threshold
- float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
- useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
- doneDictionaryCheck = true;
- }
- return useDictionaryEncoding;
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
- // checking would not have happened. So do it again here.
- checkDictionaryEncoding();
-
- if (useDictionaryEncoding) {
- flushDictionary();
- } else {
- // flushout any left over entries from dictionary
- if (rows.size() > 0) {
- flushDictionary();
- }
-
- // suppress the stream for every stripe if dictionary is disabled
- stringOutput.suppress();
- }
-
- // we need to build the rowindex before calling super, since it
- // writes it out.
- super.writeStripe(builder, requiredIndexEntries);
- stringOutput.flush();
- lengthOutput.flush();
- rowOutput.flush();
- directStreamOutput.flush();
- directLengthOutput.flush();
- // reset all of the fields to be ready for the next stripe.
- dictionary.clear();
- savedRowIndex.clear();
- rowIndexValueCount.clear();
- recordPosition(rowIndexPosition);
- rowIndexValueCount.add(0L);
-
- if (!useDictionaryEncoding) {
- // record the start positions of first index stride of next stripe i.e
- // beginning of the direct streams when dictionary is disabled
- recordDirectStreamPosition();
- }
- }
-
- private void flushDictionary() throws IOException {
- final int[] dumpOrder = new int[dictionary.size()];
-
- if (useDictionaryEncoding) {
- // Write the dictionary by traversing the red-black tree writing out
- // the bytes and lengths; and creating the map from the original order
- // to the final sorted order.
-
- dictionary.visit(new StringRedBlackTree.Visitor() {
- private int currentId = 0;
- @Override
- public void visit(StringRedBlackTree.VisitorContext context
- ) throws IOException {
- context.writeBytes(stringOutput);
- lengthOutput.write(context.getLength());
- dumpOrder[context.getOriginalPosition()] = currentId++;
- }
- });
- } else {
- // for direct encoding, we don't want the dictionary data stream
- stringOutput.suppress();
- }
- int length = rows.size();
- int rowIndexEntry = 0;
- OrcProto.RowIndex.Builder rowIndex = getRowIndex();
- Text text = new Text();
- // write the values translated into the dump order.
- for(int i = 0; i <= length; ++i) {
- // now that we are writing out the row values, we can finalize the
- // row index
- if (buildIndex) {
- while (i == rowIndexValueCount.get(rowIndexEntry) &&
- rowIndexEntry < savedRowIndex.size()) {
- OrcProto.RowIndexEntry.Builder base =
- savedRowIndex.get(rowIndexEntry++).toBuilder();
- if (useDictionaryEncoding) {
- rowOutput.getPosition(new RowIndexPositionRecorder(base));
- } else {
- PositionRecorder posn = new RowIndexPositionRecorder(base);
- directStreamOutput.getPosition(posn);
- directLengthOutput.getPosition(posn);
- }
- rowIndex.addEntry(base.build());
- }
- }
- if (i != length) {
- if (useDictionaryEncoding) {
- rowOutput.write(dumpOrder[rows.get(i)]);
- } else {
- dictionary.getText(text, rows.get(i));
- directStreamOutput.write(text.getBytes(), 0, text.getLength());
- directLengthOutput.write(text.getLength());
- }
- }
- }
- rows.clear();
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- // Returns the encoding used for the last call to writeStripe
- if (useDictionaryEncoding) {
- if(isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder().setKind(
- OrcProto.ColumnEncoding.Kind.DICTIONARY_V2).
- setDictionarySize(dictionary.size()).build();
- }
- return OrcProto.ColumnEncoding.newBuilder().setKind(
- OrcProto.ColumnEncoding.Kind.DICTIONARY).
- setDictionarySize(dictionary.size()).build();
- } else {
- if(isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder().setKind(
- OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder().setKind(
- OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
- }
-
- /**
- * This method doesn't call the super method, because unlike most of the
- * other TreeWriters, this one can't record the position in the streams
- * until the stripe is being flushed. Therefore it saves all of the entries
- * and augments them with the final information as the stripe is written.
- * @throws IOException
- */
- @Override
- void createRowIndexEntry() throws IOException {
- getStripeStatistics().merge(indexStatistics);
- OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
- rowIndexEntry.setStatistics(indexStatistics.serialize());
- indexStatistics.reset();
- OrcProto.RowIndexEntry base = rowIndexEntry.build();
- savedRowIndex.add(base);
- rowIndexEntry.clear();
- addBloomFilterEntry();
- recordPosition(rowIndexPosition);
- rowIndexValueCount.add(Long.valueOf(rows.size()));
- if (strideDictionaryCheck) {
- checkDictionaryEncoding();
- }
- if (!useDictionaryEncoding) {
- if (rows.size() > 0) {
- flushDictionary();
- // just record the start positions of next index stride
- recordDirectStreamPosition();
- } else {
- // record the start positions of next index stride
- recordDirectStreamPosition();
- getRowIndex().addEntry(base);
- }
- }
- }
-
- private void recordDirectStreamPosition() throws IOException {
- directStreamOutput.getPosition(rowIndexPosition);
- directLengthOutput.getPosition(rowIndexPosition);
- }
-
- @Override
- long estimateMemory() {
- return rows.getSizeInBytes() + dictionary.getSizeInBytes();
- }
- }
-
- private static class StringTreeWriter extends StringBaseTreeWriter {
- StringTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- if (useDictionaryEncoding) {
- int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
- for(int i=0; i < length; ++i) {
- rows.add(id);
- }
- } else {
- for(int i=0; i < length; ++i) {
- directStreamOutput.write(vec.vector[0], vec.start[0],
- vec.length[0]);
- directLengthOutput.write(vec.length[0]);
- }
- }
- indexStatistics.updateString(vec.vector[0], vec.start[0],
- vec.length[0], length);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- if (useDictionaryEncoding) {
- rows.add(dictionary.add(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]));
- } else {
- directStreamOutput.write(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- directLengthOutput.write(vec.length[offset + i]);
- }
- indexStatistics.updateString(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i], 1);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- }
- }
- }
- }
- }
- }
-
- /**
- * Under the covers, char is written to ORC the same way as string.
- */
- private static class CharTreeWriter extends StringBaseTreeWriter {
- private final int itemLength;
- private final byte[] padding;
-
- CharTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- itemLength = schema.getMaxLength();
- padding = new byte[itemLength];
- }
-
- /**
- * Override base class implementation to support char values.
- */
- @Override
- Text getTextValue(Object obj) {
- return (((HiveCharObjectInspector) inspector)
- .getPrimitiveWritableObject(obj)).getTextValue();
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- byte[] ptr;
- int ptrOffset;
- if (vec.length[0] >= itemLength) {
- ptr = vec.vector[0];
- ptrOffset = vec.start[0];
- } else {
- ptr = padding;
- ptrOffset = 0;
- System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
- vec.length[0]);
- Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
- }
- if (useDictionaryEncoding) {
- int id = dictionary.add(ptr, ptrOffset, itemLength);
- for(int i=0; i < length; ++i) {
- rows.add(id);
- }
- } else {
- for(int i=0; i < length; ++i) {
- directStreamOutput.write(ptr, ptrOffset, itemLength);
- directLengthOutput.write(itemLength);
- }
- }
- indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
- if (createBloomFilter) {
- bloomFilter.addBytes(ptr, ptrOffset, itemLength);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- byte[] ptr;
- int ptrOffset;
- if (vec.length[offset + i] >= itemLength) {
- ptr = vec.vector[offset + i];
- ptrOffset = vec.start[offset + i];
- } else {
- // it is the wrong length, so copy it
- ptr = padding;
- ptrOffset = 0;
- System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
- ptr, 0, vec.length[offset + i]);
- Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
- }
- if (useDictionaryEncoding) {
- rows.add(dictionary.add(ptr, ptrOffset, itemLength));
- } else {
- directStreamOutput.write(ptr, ptrOffset, itemLength);
- directLengthOutput.write(itemLength);
- }
- indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
- if (createBloomFilter) {
- bloomFilter.addBytes(ptr, ptrOffset, itemLength);
- }
- }
- }
- }
- }
- }
-
- /**
- * Under the covers, varchar is written to ORC the same way as string.
- */
- private static class VarcharTreeWriter extends StringBaseTreeWriter {
- private final int maxLength;
-
- VarcharTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- maxLength = schema.getMaxLength();
- }
-
- /**
- * Override base class implementation to support varchar values.
- */
- @Override
- Text getTextValue(Object obj) {
- return (((HiveVarcharObjectInspector) inspector)
- .getPrimitiveWritableObject(obj)).getTextValue();
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- int itemLength = Math.min(vec.length[0], maxLength);
- if (useDictionaryEncoding) {
- int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
- for(int i=0; i < length; ++i) {
- rows.add(id);
- }
- } else {
- for(int i=0; i < length; ++i) {
- directStreamOutput.write(vec.vector[0], vec.start[0],
- itemLength);
- directLengthOutput.write(itemLength);
- }
- }
- indexStatistics.updateString(vec.vector[0], vec.start[0],
- itemLength, length);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- int itemLength = Math.min(vec.length[offset + i], maxLength);
- if (useDictionaryEncoding) {
- rows.add(dictionary.add(vec.vector[offset + i],
- vec.start[offset + i], itemLength));
- } else {
- directStreamOutput.write(vec.vector[offset + i],
- vec.start[offset + i], itemLength);
- directLengthOutput.write(itemLength);
- }
- indexStatistics.updateString(vec.vector[offset + i],
- vec.start[offset + i], itemLength, 1);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[offset + i],
- vec.start[offset + i], itemLength);
- }
- }
- }
- }
- }
- }
-
- private static class BinaryTreeWriter extends TreeWriter {
- private final PositionedOutputStream stream;
- private final IntegerWriter length;
- private boolean isDirectV2 = true;
-
- BinaryTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.stream = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.length = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- recordPosition(rowIndexPosition);
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- if (isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- BytesWritable val =
- ((BinaryObjectInspector) inspector).getPrimitiveWritableObject(obj);
- stream.write(val.getBytes(), 0, val.getLength());
- length.write(val.getLength());
- indexStatistics.updateBinary(val);
- if (createBloomFilter) {
- bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
- }
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- for(int i=0; i < length; ++i) {
- stream.write(vec.vector[0], vec.start[0],
- vec.length[0]);
- this.length.write(vec.length[0]);
- }
- indexStatistics.updateBinary(vec.vector[0], vec.start[0],
- vec.length[0], length);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- stream.write(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- this.length.write(vec.length[offset + i]);
- indexStatistics.updateBinary(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i], 1);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- }
- }
- }
- }
- }
-
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- stream.flush();
- length.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- stream.getPosition(recorder);
- length.getPosition(recorder);
- }
- }
-
- static final int MILLIS_PER_SECOND = 1000;
- static final int NANOS_PER_SECOND = 1000000000;
- static final int MILLIS_PER_NANO = 1000000;
- static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
-
- private static class TimestampTreeWriter extends TreeWriter {
- private final IntegerWriter seconds;
- private final IntegerWriter nanos;
- private final boolean isDirectV2;
- private final long base_timestamp;
-
- TimestampTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.seconds = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
- this.nanos = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
- recordPosition(rowIndexPosition);
- // for unit tests to set different time zones
- this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
- writer.useWriterTimeZone(true);
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- if (isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- Timestamp val =
- ((TimestampObjectInspector) inspector).
- getPrimitiveJavaObject(obj);
- indexStatistics.updateTimestamp(val);
- seconds.write((val.getTime() / MILLIS_PER_SECOND) - base_timestamp);
- nanos.write(formatNanos(val.getNanos()));
- if (createBloomFilter) {
- bloomFilter.addLong(val.getTime());
- }
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- long value = vec.vector[0];
- long valueMillis = value / MILLIS_PER_NANO;
- indexStatistics.updateTimestamp(valueMillis);
- if (createBloomFilter) {
- bloomFilter.addLong(valueMillis);
- }
- final long secs = value / NANOS_PER_SECOND - base_timestamp;
- final long nano = formatNanos((int) (value % NANOS_PER_SECOND));
- for(int i=0; i < length; ++i) {
- seconds.write(secs);
- nanos.write(nano);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- long value = vec.vector[i + offset];
- long valueMillis = value / MILLIS_PER_NANO;
- long valueSecs = value /NANOS_PER_SECOND - base_timestamp;
- int valueNanos = (int) (value % NANOS_PER_SECOND);
- if (valueNanos < 0) {
- valueNanos += NANOS_PER_SECOND;
- }
- seconds.write(valueSecs);
- nanos.write(formatNanos(valueNanos));
- indexStatistics.updateTimestamp(valueMillis);
- if (createBloomFilter) {
- bloomFilter.addLong(valueMillis);
- }
- }
- }
- }
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- seconds.flush();
- nanos.flush();
- recordPosition(rowIndexPosition);
- }
-
- private static long formatNanos(int nanos) {
- if (nanos == 0) {
- return 0;
- } else if (nanos % 100 != 0) {
- return ((long) nanos) << 3;
- } else {
- nanos /= 100;
- int trailingZeros = 1;
- while (nanos % 10 == 0 && trailingZeros < 7) {
- nanos /= 10;
- trailingZeros += 1;
- }
- return ((long) nanos) << 3 | trailingZeros;
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- seconds.getPosition(recorder);
- nanos.getPosition(recorder);
- }
- }
-
- private static class DateTreeWriter extends TreeWriter {
- private final IntegerWriter writer;
- private final boolean isDirectV2;
-
- DateTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- OutStream out = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.writer = createIntegerWriter(out, true, isDirectV2, writer);
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- // Using the Writable here as it's used directly for writing as well as for stats.
- DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj);
- indexStatistics.updateDate(val);
- writer.write(val.getDays());
- if (createBloomFilter) {
- bloomFilter.addLong(val.getDays());
- }
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- int value = (int) vec.vector[0];
- indexStatistics.updateDate(value);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- for(int i=0; i < length; ++i) {
- writer.write(value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- int value = (int) vec.vector[i + offset];
- writer.write(value);
- indexStatistics.updateDate(value);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- }
- }
- }
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- writer.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- if (isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
- }
-
- private static class DecimalTreeWriter extends TreeWriter {
- private final PositionedOutputStream valueStream;
- private final IntegerWriter scaleStream;
- private final boolean isDirectV2;
-
- DecimalTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.isDirectV2 = isNewWriteFormat(writer);
- valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- this.scaleStream = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
- recordPosition(rowIndexPosition);
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- if (isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- HiveDecimal decimal = ((HiveDecimalObjectInspector) inspector).
- getPrimitiveJavaObject(obj);
- if (decimal == null) {
- return;
- }
- SerializationUtils.writeBigInteger(valueStream,
- decimal.unscaledValue());
- scaleStream.write(decimal.scale());
- indexStatistics.updateDecimal(decimal);
- if (createBloomFilter) {
- bloomFilter.addString(decimal.toString());
- }
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- DecimalColumnVector vec = (DecimalColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- HiveDecimal value = vec.vector[0].getHiveDecimal();
- indexStatistics.updateDecimal(value);
- if (createBloomFilter) {
- bloomFilter.addString(value.toString());
- }
- for(int i=0; i < length; ++i) {
- SerializationUtils.writeBigInteger(valueStream,
- value.unscaledValue());
- scaleStream.write(value.scale());
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- HiveDecimal value = vec.vector[i + offset].getHiveDecimal();
- SerializationUtils.writeBigInteger(valueStream,
- value.unscaledValue());
- scaleStream.write(value.scale());
- indexStatistics.updateDecimal(value);
- if (createBloomFilter) {
- bloomFilter.addString(value.toString());
- }
- }
- }
- }
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- valueStream.flush();
- scaleStream.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- valueStream.getPosition(recorder);
- scaleStream.getPosition(recorder);
- }
- }
-
- private static class StructTreeWriter extends TreeWriter {
- private final List<? extends StructField> fields;
- StructTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- List<TypeDescription> children = schema.getChildren();
- if (inspector != null) {
- StructObjectInspector structObjectInspector =
- (StructObjectInspector) inspector;
- fields = structObjectInspector.getAllStructFieldRefs();
- } else {
- fields = null;
- }
- childrenWriters = new TreeWriter[children.size()];
- for(int i=0; i < childrenWriters.length; ++i) {
- ObjectInspector childOI;
- if (fields != null && i < fields.size()) {
- childOI = fields.get(i).getFieldObjectInspector();
- } else {
- childOI = null;
- }
- childrenWriters[i] = createTreeWriter(
- childOI, children.get(i), writer,
- true);
- }
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- StructObjectInspector insp = (StructObjectInspecto
<TRUNCATED>