You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/06/16 18:29:58 UTC
[1/4] orc git commit: ORC-194. Split TreeWriters out of WriterImpl.
Repository: orc
Updated Branches:
refs/heads/master 8b103da92 -> ded204a4a
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
new file mode 100644
index 0000000..ea4e0e6
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+
+/**
+ * The writers for the specific writers of each type. This provides
+ * the generic API that they must all implement.
+ */
+public interface TreeWriter {
+
+ /**
+ * Estimate the memory currently used to buffer the stripe.
+ * @return the number of bytes
+ */
+ long estimateMemory();
+
+ /**
+ * Estimate the memory used if the file was read into Hive's Writable
+ * types. This is used as an estimate for the query optimizer.
+ * @return the number of bytes
+ */
+ long getRawDataSize();
+
+ /**
+ * Write a VectorizedRowBath to the file. This is called by the WriterImpl
+ * at the top level.
+ * @param batch the list of all of the columns
+ * @param offset the first row from the batch to write
+ * @param length the number of rows to write
+ */
+ void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException;
+
+ /**
+ * Write a ColumnVector to the file. This is called recursively by
+ * writeRootBatch.
+ * @param vector the data to write
+ * @param offset the first value offset to write.
+ * @param length the number of values to write
+ */
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException;
+
+ /**
+ * Create a row index entry at the current point in the stripe.
+ */
+ void createRowIndexEntry() throws IOException;
+
+ /**
+ * Write the stripe out to the file.
+ * @param stripeFooter the stripe footer that contains the information about the
+ * layout of the stripe. The TreeWriterBase is required to update
+ * the footer with its information.
+ * @param stats the stripe statistics information
+ * @param requiredIndexEntries the number of index entries that are
+ * required. this is to check to make sure the
+ * row index is well formed.
+ */
+ void writeStripe(OrcProto.StripeFooter.Builder stripeFooter,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException;
+
+ /**
+ * During a stripe append, we need to update the file statistics.
+ * @param stripeStatistics the statistics for the new stripe
+ */
+ void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics);
+
+ /**
+ * Add the file statistics to the file footer.
+ * @param footer the file footer builder
+ */
+ void writeFileStatistics(OrcProto.Footer.Builder footer);
+
+ public class Factory {
+ public static TreeWriter create(TypeDescription schema,
+ WriterContext streamFactory,
+ boolean nullable) throws IOException {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return new BooleanTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case BYTE:
+ return new ByteTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case SHORT:
+ case INT:
+ case LONG:
+ return new IntegerTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case FLOAT:
+ return new FloatTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case DOUBLE:
+ return new DoubleTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case STRING:
+ return new StringTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case CHAR:
+ return new CharTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case VARCHAR:
+ return new VarcharTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case BINARY:
+ return new BinaryTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case TIMESTAMP:
+ return new TimestampTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case DATE:
+ return new DateTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case DECIMAL:
+ return new DecimalTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case STRUCT:
+ return new StructTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case MAP:
+ return new MapTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case LIST:
+ return new ListTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ case UNION:
+ return new UnionTreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ default:
+ throw new IllegalArgumentException("Bad category: " +
+ schema.getCategory());
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
new file mode 100644
index 0000000..5cfde07
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriterBase.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.BitFieldWriter;
+import org.apache.orc.impl.ColumnStatisticsImpl;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.RunLengthIntegerWriter;
+import org.apache.orc.impl.RunLengthIntegerWriterV2;
+import org.apache.orc.impl.StreamName;
+import org.apache.orc.util.BloomFilter;
+import org.apache.orc.util.BloomFilterIO;
+import org.apache.orc.util.BloomFilterUtf8;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.TimeZone;
+
+/**
+ * The parent class of all of the writers for each column. Each column
+ * is written by an instance of this class. The compound types (struct,
+ * list, map, and union) have children tree writers that write the children
+ * types.
+ */
+public abstract class TreeWriterBase implements TreeWriter {
+ protected final int id;
+ protected final BitFieldWriter isPresent;
+ private final boolean isCompressed;
+ protected final ColumnStatisticsImpl indexStatistics;
+ protected final ColumnStatisticsImpl stripeColStatistics;
+ protected final ColumnStatisticsImpl fileStatistics;
+ protected final RowIndexPositionRecorder rowIndexPosition;
+ private final OrcProto.RowIndex.Builder rowIndex;
+ private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
+ protected final BloomFilter bloomFilter;
+ protected final BloomFilterUtf8 bloomFilterUtf8;
+ protected final boolean createBloomFilter;
+ private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+ private final OrcProto.BloomFilterIndex.Builder bloomFilterIndexUtf8;
+ protected final OrcProto.BloomFilter.Builder bloomFilterEntry;
+ private boolean foundNulls;
+ private OutStream isPresentOutStream;
+ private final WriterContext streamFactory;
+
+ /**
+ * Create a tree writer.
+ * @param columnId the column id of the column to write
+ * @param schema the row schema
+ * @param streamFactory limited access to the Writer's data.
+ * @param nullable can the value be null?
+ */
+ TreeWriterBase(int columnId,
+ TypeDescription schema,
+ WriterContext streamFactory,
+ boolean nullable) throws IOException {
+ this.streamFactory = streamFactory;
+ this.isCompressed = streamFactory.isCompressed();
+ this.id = columnId;
+ if (nullable) {
+ isPresentOutStream = streamFactory.createStream(id,
+ OrcProto.Stream.Kind.PRESENT);
+ isPresent = new BitFieldWriter(isPresentOutStream, 1);
+ } else {
+ isPresent = null;
+ }
+ this.foundNulls = false;
+ createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
+ indexStatistics = ColumnStatisticsImpl.create(schema);
+ stripeColStatistics = ColumnStatisticsImpl.create(schema);
+ fileStatistics = ColumnStatisticsImpl.create(schema);
+ if (streamFactory.buildIndex()) {
+ rowIndex = OrcProto.RowIndex.newBuilder();
+ rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+ rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
+ } else {
+ rowIndex = null;
+ rowIndexEntry = null;
+ rowIndexPosition = null;
+ }
+ if (createBloomFilter) {
+ bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
+ if (streamFactory.getBloomFilterVersion() == OrcFile.BloomFilterVersion.ORIGINAL) {
+ bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(),
+ streamFactory.getBloomFilterFPP());
+ bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+ } else {
+ bloomFilter = null;
+ bloomFilterIndex = null;
+ }
+ bloomFilterUtf8 = new BloomFilterUtf8(streamFactory.getRowIndexStride(),
+ streamFactory.getBloomFilterFPP());
+ bloomFilterIndexUtf8 = OrcProto.BloomFilterIndex.newBuilder();
+ } else {
+ bloomFilterEntry = null;
+ bloomFilterIndex = null;
+ bloomFilterIndexUtf8 = null;
+ bloomFilter = null;
+ bloomFilterUtf8 = null;
+ }
+ }
+
+ protected OrcProto.RowIndex.Builder getRowIndex() {
+ return rowIndex;
+ }
+
+ protected ColumnStatisticsImpl getStripeStatistics() {
+ return stripeColStatistics;
+ }
+
+ protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
+ return rowIndexEntry;
+ }
+
+ IntegerWriter createIntegerWriter(PositionedOutputStream output,
+ boolean signed, boolean isDirectV2,
+ WriterContext writer) {
+ if (isDirectV2) {
+ boolean alignedBitpacking = false;
+ if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
+ alignedBitpacking = true;
+ }
+ return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
+ } else {
+ return new RunLengthIntegerWriter(output, signed);
+ }
+ }
+
+ boolean isNewWriteFormat(WriterContext writer) {
+ return writer.getVersion() != OrcFile.Version.V_0_11;
+ }
+
+ /**
+ * Handle the top level object write.
+ *
+ * This default method is used for all types except structs, which are the
+ * typical case. VectorizedRowBatch assumes the top level object is a
+ * struct, so we use the first column for all other types.
+ * @param batch the batch to write from
+ * @param offset the row to start on
+ * @param length the number of rows to write
+ */
+ public void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException {
+ writeBatch(batch.cols[0], offset, length);
+ }
+
+ /**
+ * Write the values from the given vector from offset for length elements.
+ * @param vector the vector to write from
+ * @param offset the first value from the vector to write
+ * @param length the number of values from the vector to write
+ */
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ if (vector.noNulls) {
+ indexStatistics.increment(length);
+ if (isPresent != null) {
+ for (int i = 0; i < length; ++i) {
+ isPresent.write(1);
+ }
+ }
+ } else {
+ if (vector.isRepeating) {
+ boolean isNull = vector.isNull[0];
+ if (isPresent != null) {
+ for (int i = 0; i < length; ++i) {
+ isPresent.write(isNull ? 0 : 1);
+ }
+ }
+ if (isNull) {
+ foundNulls = true;
+ indexStatistics.setNull();
+ } else {
+ indexStatistics.increment(length);
+ }
+ } else {
+ // count the number of non-null values
+ int nonNullCount = 0;
+ for(int i = 0; i < length; ++i) {
+ boolean isNull = vector.isNull[i + offset];
+ if (!isNull) {
+ nonNullCount += 1;
+ }
+ if (isPresent != null) {
+ isPresent.write(isNull ? 0 : 1);
+ }
+ }
+ indexStatistics.increment(nonNullCount);
+ if (nonNullCount != length) {
+ foundNulls = true;
+ indexStatistics.setNull();
+ }
+ }
+ }
+ }
+
+ private void removeIsPresentPositions() {
+ for(int i=0; i < rowIndex.getEntryCount(); ++i) {
+ OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
+ List<Long> positions = entry.getPositionsList();
+ // bit streams use 3 positions if uncompressed, 4 if compressed
+ positions = positions.subList(isCompressed ? 4 : 3, positions.size());
+ entry.clearPositions();
+ entry.addAllPositions(positions);
+ }
+ }
+
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ if (isPresent != null) {
+ isPresent.flush();
+
+ // if no nulls are found in a stream, then suppress the stream
+ if(!foundNulls) {
+ isPresentOutStream.suppress();
+ // since isPresent bitstream is suppressed, update the index to
+ // remove the positions of the isPresent stream
+ if (rowIndex != null) {
+ removeIsPresentPositions();
+ }
+ }
+ }
+
+ // merge stripe-level column statistics to file statistics and write it to
+ // stripe statistics
+ fileStatistics.merge(stripeColStatistics);
+ stats.addColStats(stripeColStatistics.serialize());
+ stripeColStatistics.reset();
+
+ // reset the flag for next stripe
+ foundNulls = false;
+
+ builder.addColumns(getEncoding());
+ if (rowIndex != null) {
+ if (rowIndex.getEntryCount() != requiredIndexEntries) {
+ throw new IllegalArgumentException("Column has wrong number of " +
+ "index entries found: " + rowIndex.getEntryCount() + " expected: " +
+ requiredIndexEntries);
+ }
+ streamFactory.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
+ rowIndex.clear();
+ rowIndexEntry.clear();
+ }
+
+ // write the bloom filter to out stream
+ if (bloomFilterIndex != null) {
+ streamFactory.writeBloomFilter(new StreamName(id,
+ OrcProto.Stream.Kind.BLOOM_FILTER), bloomFilterIndex);
+ bloomFilterIndex.clear();
+ }
+ // write the bloom filter to out stream
+ if (bloomFilterIndexUtf8 != null) {
+ streamFactory.writeBloomFilter(new StreamName(id,
+ OrcProto.Stream.Kind.BLOOM_FILTER_UTF8), bloomFilterIndexUtf8);
+ bloomFilterIndexUtf8.clear();
+ }
+ }
+
+ /**
+ * Get the encoding for this column.
+ * @return the information about the encoding of this column
+ */
+ OrcProto.ColumnEncoding.Builder getEncoding() {
+ OrcProto.ColumnEncoding.Builder builder =
+ OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+ if (createBloomFilter) {
+ builder.setBloomEncoding(BloomFilterIO.Encoding.CURRENT.getId());
+ }
+ return builder;
+ }
+
+ /**
+ * Create a row index entry with the previous location and the current
+ * index statistics. Also merges the index statistics into the file
+ * statistics before they are cleared. Finally, it records the start of the
+ * next index and ensures all of the children columns also create an entry.
+ */
+ public void createRowIndexEntry() throws IOException {
+ stripeColStatistics.merge(indexStatistics);
+ rowIndexEntry.setStatistics(indexStatistics.serialize());
+ indexStatistics.reset();
+ rowIndex.addEntry(rowIndexEntry);
+ rowIndexEntry.clear();
+ addBloomFilterEntry();
+ recordPosition(rowIndexPosition);
+ }
+
+ void addBloomFilterEntry() {
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ BloomFilterIO.serialize(bloomFilterEntry, bloomFilter);
+ bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
+ bloomFilter.reset();
+ }
+ if (bloomFilterUtf8 != null) {
+ BloomFilterIO.serialize(bloomFilterEntry, bloomFilterUtf8);
+ bloomFilterIndexUtf8.addBloomFilter(bloomFilterEntry.build());
+ bloomFilterUtf8.reset();
+ }
+ }
+ }
+
+ @Override
+ public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+ fileStatistics.merge(ColumnStatisticsImpl.deserialize(stats.getColStats(id)));
+ }
+
+ /**
+ * Record the current position in each of this column's streams.
+ * @param recorder where should the locations be recorded
+ */
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ if (isPresent != null) {
+ isPresent.getPosition(recorder);
+ }
+ }
+
+ /**
+ * Estimate how much memory the writer is consuming excluding the streams.
+ * @return the number of bytes.
+ */
+ public long estimateMemory() {
+ long result = 0;
+ if (isPresent != null) {
+ result = isPresentOutStream.getBufferSize();
+ }
+ return result;
+ }
+
+ @Override
+ public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+ footer.addStatistics(fileStatistics.serialize());
+ }
+
+ static class RowIndexPositionRecorder implements PositionRecorder {
+ private final OrcProto.RowIndexEntry.Builder builder;
+
+ RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public void addPosition(long position) {
+ builder.addPositions(position);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java
new file mode 100644
index 0000000..5047f01
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/UnionTreeWriter.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.RunLengthByteWriter;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.util.List;
+
+public class UnionTreeWriter extends TreeWriterBase {
+ private final RunLengthByteWriter tags;
+ private final TreeWriter[] childrenWriters;
+
+ UnionTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ List<TypeDescription> children = schema.getChildren();
+ childrenWriters = new TreeWriterBase[children.size()];
+ for (int i = 0; i < childrenWriters.length; ++i) {
+ childrenWriters[i] = Factory.create(children.get(i), writer, true);
+ }
+ tags =
+ new RunLengthByteWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.DATA));
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ UnionColumnVector vec = (UnionColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte tag = (byte) vec.tags[0];
+ for (int i = 0; i < length; ++i) {
+ tags.write(tag);
+ }
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(tag);
+ }
+ bloomFilterUtf8.addLong(tag);
+ }
+ childrenWriters[tag].writeBatch(vec.fields[tag], offset, length);
+ }
+ } else {
+ // write the records in runs of the same tag
+ int[] currentStart = new int[vec.fields.length];
+ int[] currentLength = new int[vec.fields.length];
+ for (int i = 0; i < length; ++i) {
+ // only need to deal with the non-nulls, since the nulls were dealt
+ // with in the super method.
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ byte tag = (byte) vec.tags[offset + i];
+ tags.write(tag);
+ if (currentLength[tag] == 0) {
+ // start a new sequence
+ currentStart[tag] = i + offset;
+ currentLength[tag] = 1;
+ } else if (currentStart[tag] + currentLength[tag] == i + offset) {
+ // ok, we are extending the current run for that tag.
+ currentLength[tag] += 1;
+ } else {
+ // otherwise, we need to close off the old run and start a new one
+ childrenWriters[tag].writeBatch(vec.fields[tag],
+ currentStart[tag], currentLength[tag]);
+ currentStart[tag] = i + offset;
+ currentLength[tag] = 1;
+ }
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(tag);
+ }
+ bloomFilterUtf8.addLong(tag);
+ }
+ }
+ }
+ // write out any left over sequences
+ for (int tag = 0; tag < currentStart.length; ++tag) {
+ if (currentLength[tag] != 0) {
+ childrenWriters[tag].writeBatch(vec.fields[tag], currentStart[tag],
+ currentLength[tag]);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void createRowIndexEntry() throws IOException {
+ super.createRowIndexEntry();
+ for (TreeWriter child : childrenWriters) {
+ child.createRowIndexEntry();
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ tags.flush();
+ for (TreeWriter child : childrenWriters) {
+ child.writeStripe(builder, stats, requiredIndexEntries);
+ }
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ tags.getPosition(recorder);
+ }
+
+ @Override
+ public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+ super.updateFileStatistics(stats);
+ for (TreeWriter child : childrenWriters) {
+ child.updateFileStatistics(stats);
+ }
+ }
+
+ @Override
+ public long estimateMemory() {
+ long children = 0;
+ for (TreeWriter writer : childrenWriters) {
+ children += writer.estimateMemory();
+ }
+ return children + super.estimateMemory() + tags.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ long result = 0;
+ for (TreeWriter writer : childrenWriters) {
+ result += writer.getRawDataSize();
+ }
+ return result;
+ }
+
+ @Override
+ public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+ super.writeFileStatistics(footer);
+ for (TreeWriter child : childrenWriters) {
+ child.writeFileStatistics(footer);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java
new file mode 100644
index 0000000..17d3f61
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/VarcharTreeWriter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Under the covers, varchar is written to ORC the same way as string.
+ */
+public class VarcharTreeWriter extends StringBaseTreeWriter {
+ private final int maxLength;
+
+ VarcharTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ maxLength = schema.getMaxLength();
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int itemLength = Math.min(vec.length[0], maxLength);
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
+ for(int i=0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ directStreamOutput.write(vec.vector[0], vec.start[0],
+ itemLength);
+ lengthOutput.write(itemLength);
+ }
+ }
+ indexStatistics.updateString(vec.vector[0], vec.start[0],
+ itemLength, length);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[0],
+ vec.start[0], itemLength,
+ StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[0],
+ vec.start[0], itemLength);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int itemLength = Math.min(vec.length[offset + i], maxLength);
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(vec.vector[offset + i],
+ vec.start[offset + i], itemLength));
+ } else {
+ directStreamOutput.write(vec.vector[offset + i],
+ vec.start[offset + i], itemLength);
+ lengthOutput.write(itemLength);
+ }
+ indexStatistics.updateString(vec.vector[offset + i],
+ vec.start[offset + i], itemLength, 1);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[offset + i],
+ vec.start[offset + i], itemLength,
+ StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], itemLength);
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
new file mode 100644
index 0000000..f11d519
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.StreamName;
+
+import java.io.IOException;
+
+public interface WriterContext {
+
+ /**
+ * Create a stream to store part of a column.
+ * @param column the column id for the stream
+ * @param kind the kind of stream
+ * @return The output outStream that the section needs to be written to.
+ */
+ OutStream createStream(int column,
+ OrcProto.Stream.Kind kind
+ ) throws IOException;
+
+ /**
+ * Get the stride rate of the row index.
+ */
+ int getRowIndexStride();
+
+ /**
+ * Should be building the row index.
+ * @return true if we are building the index
+ */
+ boolean buildIndex();
+
+ /**
+ * Is the ORC file compressed?
+ * @return are the streams compressed
+ */
+ boolean isCompressed();
+
+ /**
+ * Get the encoding strategy to use.
+ * @return encoding strategy
+ */
+ OrcFile.EncodingStrategy getEncodingStrategy();
+
+ /**
+ * Get the bloom filter columns
+ * @return bloom filter columns
+ */
+ boolean[] getBloomFilterColumns();
+
+ /**
+ * Get bloom filter false positive percentage.
+ * @return fpp
+ */
+ double getBloomFilterFPP();
+
+ /**
+ * Get the writer's configuration.
+ * @return configuration
+ */
+ Configuration getConfiguration();
+
+ /**
+ * Get the version of the file to write.
+ */
+ OrcFile.Version getVersion();
+
+ OrcFile.BloomFilterVersion getBloomFilterVersion();
+
+ void writeIndex(StreamName name,
+ OrcProto.RowIndex.Builder index) throws IOException;
+
+ void writeBloomFilter(StreamName name,
+ OrcProto.BloomFilterIndex.Builder bloom
+ ) throws IOException;
+}
[4/4] orc git commit: ORC-194. Split TreeWriters out of WriterImpl.
Posted by om...@apache.org.
ORC-194. Split TreeWriters out of WriterImpl.
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/ded204a4
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/ded204a4
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/ded204a4
Branch: refs/heads/master
Commit: ded204a4a10bfad1ed739fc98f612a41005640c5
Parents: 8b103da
Author: Owen O'Malley <om...@apache.org>
Authored: Wed May 17 16:12:01 2017 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Fri Jun 16 11:29:28 2017 -0700
----------------------------------------------------------------------
.../org/apache/orc/impl/TreeReaderFactory.java | 6 +-
.../java/org/apache/orc/impl/WriterImpl.java | 2639 +-----------------
.../orc/impl/writer/BinaryTreeWriter.java | 137 +
.../orc/impl/writer/BooleanTreeWriter.java | 99 +
.../apache/orc/impl/writer/ByteTreeWriter.java | 109 +
.../apache/orc/impl/writer/CharTreeWriter.java | 122 +
.../apache/orc/impl/writer/DateTreeWriter.java | 124 +
.../orc/impl/writer/DecimalTreeWriter.java | 142 +
.../orc/impl/writer/DoubleTreeWriter.java | 112 +
.../apache/orc/impl/writer/FloatTreeWriter.java | 113 +
.../orc/impl/writer/IntegerTreeWriter.java | 127 +
.../apache/orc/impl/writer/ListTreeWriter.java | 162 ++
.../apache/orc/impl/writer/MapTreeWriter.java | 173 ++
.../orc/impl/writer/StringBaseTreeWriter.java | 288 ++
.../orc/impl/writer/StringTreeWriter.java | 93 +
.../orc/impl/writer/StructTreeWriter.java | 156 ++
.../orc/impl/writer/TimestampTreeWriter.java | 165 ++
.../org/apache/orc/impl/writer/TreeWriter.java | 160 ++
.../apache/orc/impl/writer/TreeWriterBase.java | 374 +++
.../apache/orc/impl/writer/UnionTreeWriter.java | 176 ++
.../orc/impl/writer/VarcharTreeWriter.java | 103 +
.../apache/orc/impl/writer/WriterContext.java | 95 +
22 files changed, 3161 insertions(+), 2514 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 4b369af..7e5c452 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.orc.TypeDescription;
import org.apache.orc.OrcProto;
+import org.apache.orc.impl.writer.TimestampTreeWriter;
/**
* Factory for creating ORC tree readers.
@@ -938,7 +939,8 @@ public class TreeReaderFactory {
threadLocalDateFormat.get().setTimeZone(writerTimeZone);
try {
long epoch = threadLocalDateFormat.get()
- .parse(WriterImpl.BASE_TIMESTAMP_STRING).getTime() / WriterImpl.MILLIS_PER_SECOND;
+ .parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
+ TimestampTreeWriter.MILLIS_PER_SECOND;
baseTimestampMap.put(timeZoneId, epoch);
return epoch;
} catch (ParseException e) {
@@ -977,7 +979,7 @@ public class TreeReaderFactory {
if (millis < 0 && newNanos != 0) {
millis -= 1;
}
- millis *= WriterImpl.MILLIS_PER_SECOND;
+ millis *= TimestampTreeWriter.MILLIS_PER_SECOND;
long offset = 0;
// If reader and writer time zones have different rules, adjust the timezone difference
// between reader and writer taking day light savings into account.
[2/4] orc git commit: ORC-194. Split TreeWriters out of WriterImpl.
Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java
new file mode 100644
index 0000000..5835b5a
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/BinaryTreeWriter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.BinaryColumnStatistics;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+
+import java.io.IOException;
+
+public class BinaryTreeWriter extends TreeWriterBase {
+ private final PositionedOutputStream stream;
+ private final IntegerWriter length;
+ private boolean isDirectV2 = true;
+
+ public BinaryTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.length = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ OrcProto.ColumnEncoding.Builder getEncoding() {
+ OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+ if (isDirectV2) {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+ } else {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+ }
+ return result;
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ for (int i = 0; i < length; ++i) {
+ stream.write(vec.vector[0], vec.start[0],
+ vec.length[0]);
+ this.length.write(vec.length[0]);
+ }
+ indexStatistics.updateBinary(vec.vector[0], vec.start[0],
+ vec.length[0], length);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ }
+ bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ stream.write(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ this.length.write(vec.length[offset + i]);
+ indexStatistics.updateBinary(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i], 1);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ }
+ bloomFilterUtf8.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ }
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ stream.flush();
+ length.flush();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ length.getPosition(recorder);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + stream.getBufferSize() +
+ length.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ // get total length of binary blob
+ BinaryColumnStatistics bcs = (BinaryColumnStatistics) fileStatistics;
+ return bcs.getSum();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java
new file mode 100644
index 0000000..5f572bd
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/BooleanTreeWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.BitFieldWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+
+import java.io.IOException;
+
+public class BooleanTreeWriter extends TreeWriterBase {
+ private final BitFieldWriter writer;
+
+ public BooleanTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ PositionedOutputStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.writer = new BitFieldWriter(out, 1);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int value = vec.vector[0] == 0 ? 0 : 1;
+ indexStatistics.updateBoolean(value != 0, length);
+ for (int i = 0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int value = vec.vector[i + offset] == 0 ? 0 : 1;
+ writer.write(value);
+ indexStatistics.updateBoolean(value != 0, 1);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ writer.flush();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + writer.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ long num = fileStatistics.getNumberOfValues();
+ return num * JavaDataModel.get().primitive1();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java
new file mode 100644
index 0000000..edd6411
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/ByteTreeWriter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.RunLengthByteWriter;
+
+import java.io.IOException;
+
+public class ByteTreeWriter extends TreeWriterBase {
+ private final RunLengthByteWriter writer;
+
+ public ByteTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.writer = new RunLengthByteWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA));
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte value = (byte) vec.vector[0];
+ indexStatistics.updateInteger(value, length);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
+ }
+ for (int i = 0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ byte value = (byte) vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateInteger(value, 1);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ writer.flush();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + writer.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ long num = fileStatistics.getNumberOfValues();
+ return num * JavaDataModel.get().primitive1();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java
new file mode 100644
index 0000000..92a6bab
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/CharTreeWriter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.TypeDescription;import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Under the covers, char is written to ORC the same way as string.
+ */
+public class CharTreeWriter extends StringBaseTreeWriter {
+ private final int itemLength;
+ private final byte[] padding;
+
+ CharTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ itemLength = schema.getMaxLength();
+ padding = new byte[itemLength];
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte[] ptr;
+ int ptrOffset;
+ if (vec.length[0] >= itemLength) {
+ ptr = vec.vector[0];
+ ptrOffset = vec.start[0];
+ } else {
+ ptr = padding;
+ ptrOffset = 0;
+ System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
+ vec.length[0]);
+ Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
+ }
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(ptr, ptrOffset, itemLength);
+ for(int i=0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ directStreamOutput.write(ptr, ptrOffset, itemLength);
+ lengthOutput.write(itemLength);
+ }
+ }
+ indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[0], vec.start[0],
+ vec.length[0], StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ byte[] ptr;
+ int ptrOffset;
+ if (vec.length[offset + i] >= itemLength) {
+ ptr = vec.vector[offset + i];
+ ptrOffset = vec.start[offset + i];
+ } else {
+ // it is the wrong length, so copy it
+ ptr = padding;
+ ptrOffset = 0;
+ System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
+ ptr, 0, vec.length[offset + i]);
+ Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
+ }
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(ptr, ptrOffset, itemLength));
+ } else {
+ directStreamOutput.write(ptr, ptrOffset, itemLength);
+ lengthOutput.write(itemLength);
+ }
+ indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i],
+ StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java
new file mode 100644
index 0000000..d15fb13
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/DateTreeWriter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+
+import java.io.IOException;
+
+public class DateTreeWriter extends TreeWriterBase {
+ private final IntegerWriter writer;
+ private final boolean isDirectV2;
+
+ public DateTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ OutStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int value = (int) vec.vector[0];
+ indexStatistics.updateDate(value);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
+ }
+ for (int i = 0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int value = (int) vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateDate(value);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ writer.flush();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding.Builder getEncoding() {
+ OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+ if (isDirectV2) {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+ } else {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+ }
+ return result;
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + writer.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ return fileStatistics.getNumberOfValues() *
+ JavaDataModel.get().lengthOfDate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java
new file mode 100644
index 0000000..0428253
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/DecimalTreeWriter.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+
+import java.io.IOException;
+
+public class DecimalTreeWriter extends TreeWriterBase {
+ private final PositionedOutputStream valueStream;
+
+ // These scratch buffers allow us to serialize decimals much faster.
+ private final long[] scratchLongs;
+ private final byte[] scratchBuffer;
+
+ private final IntegerWriter scaleStream;
+ private final boolean isDirectV2;
+
+ public DecimalTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+ scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN];
+ scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
+ this.scaleStream = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ OrcProto.ColumnEncoding.Builder getEncoding() {
+ OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+ if (isDirectV2) {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+ } else {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+ }
+ return result;
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DecimalColumnVector vec = (DecimalColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ HiveDecimalWritable value = vec.vector[0];
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ String str = value.toString(scratchBuffer);
+ if (bloomFilter != null) {
+ bloomFilter.addString(str);
+ }
+ bloomFilterUtf8.addString(str);
+ }
+ for (int i = 0; i < length; ++i) {
+ value.serializationUtilsWrite(valueStream,
+ scratchLongs);
+ scaleStream.write(value.scale());
+ }
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ HiveDecimalWritable value = vec.vector[i + offset];
+ value.serializationUtilsWrite(valueStream, scratchLongs);
+ scaleStream.write(value.scale());
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ String str = value.toString(scratchBuffer);
+ if (bloomFilter != null) {
+ bloomFilter.addString(str);
+ }
+ bloomFilterUtf8.addString(str);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ valueStream.flush();
+ scaleStream.flush();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ valueStream.getPosition(recorder);
+ scaleStream.getPosition(recorder);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + valueStream.getBufferSize() +
+ scaleStream.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ return fileStatistics.getNumberOfValues() *
+ JavaDataModel.get().lengthOfDecimal();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java
new file mode 100644
index 0000000..d2c0db2
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/DoubleTreeWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.SerializationUtils;
+
+import java.io.IOException;
+
+public class DoubleTreeWriter extends TreeWriterBase {
+ private final PositionedOutputStream stream;
+ private final SerializationUtils utils;
+
+ public DoubleTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.utils = new SerializationUtils();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DoubleColumnVector vec = (DoubleColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ double value = vec.vector[0];
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addDouble(value);
+ }
+ bloomFilterUtf8.addDouble(value);
+ }
+ for (int i = 0; i < length; ++i) {
+ utils.writeDouble(stream, value);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ double value = vec.vector[i + offset];
+ utils.writeDouble(stream, value);
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addDouble(value);
+ }
+ bloomFilterUtf8.addDouble(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ stream.flush();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + stream.getBufferSize();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ long num = fileStatistics.getNumberOfValues();
+ return num * JavaDataModel.get().primitive2();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java
new file mode 100644
index 0000000..c825bf1
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/FloatTreeWriter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.SerializationUtils;
+
+import java.io.IOException;
+
+public class FloatTreeWriter extends TreeWriterBase {
+ private final PositionedOutputStream stream;
+ private final SerializationUtils utils;
+
+ public FloatTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.utils = new SerializationUtils();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DoubleColumnVector vec = (DoubleColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ float value = (float) vec.vector[0];
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addDouble(value);
+ }
+ bloomFilterUtf8.addDouble(value);
+ }
+ for (int i = 0; i < length; ++i) {
+ utils.writeFloat(stream, value);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ float value = (float) vec.vector[i + offset];
+ utils.writeFloat(stream, value);
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addDouble(value);
+ }
+ bloomFilterUtf8.addDouble(value);
+ }
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ stream.flush();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + stream.getBufferSize();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ long num = fileStatistics.getNumberOfValues();
+ return num * JavaDataModel.get().primitive1();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java
new file mode 100644
index 0000000..6036ef5
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/IntegerTreeWriter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+
+import java.io.IOException;
+
+public class IntegerTreeWriter extends TreeWriterBase {
+ private final IntegerWriter writer;
+ private boolean isDirectV2 = true;
+ private final boolean isLong;
+
+ public IntegerTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ OutStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ this.isLong = schema.getCategory() == TypeDescription.Category.LONG;
+ }
+
+ @Override
+ OrcProto.ColumnEncoding.Builder getEncoding() {
+ OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+ if (isDirectV2) {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+ } else {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+ }
+ return result;
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ long value = vec.vector[0];
+ indexStatistics.updateInteger(value, length);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
+ }
+ for (int i = 0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ long value = vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateInteger(value, 1);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(value);
+ }
+ bloomFilterUtf8.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ writer.flush();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + writer.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ long num = fileStatistics.getNumberOfValues();
+ return num * (isLong ? jdm.primitive2() : jdm.primitive1());
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java
new file mode 100644
index 0000000..2c5bd50
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/ListTreeWriter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.WriterImpl;
+
+import java.io.IOException;
+
+public class ListTreeWriter extends TreeWriterBase {
+ private final IntegerWriter lengths;
+ private final boolean isDirectV2;
+ private final TreeWriter childWriter;
+
+ ListTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ childWriter = Factory.create(schema.getChildren().get(0), writer, true);
+ lengths = createIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ OrcProto.ColumnEncoding.Builder getEncoding() {
+ OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+ if (isDirectV2) {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+ } else {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+ }
+ return result;
+ }
+
+ @Override
+ public void createRowIndexEntry() throws IOException {
+ super.createRowIndexEntry();
+ childWriter.createRowIndexEntry();
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ ListColumnVector vec = (ListColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int childOffset = (int) vec.offsets[0];
+ int childLength = (int) vec.lengths[0];
+ for (int i = 0; i < length; ++i) {
+ lengths.write(childLength);
+ childWriter.writeBatch(vec.child, childOffset, childLength);
+ }
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(childLength);
+ }
+ bloomFilterUtf8.addLong(childLength);
+ }
+ }
+ } else {
+ // write the elements in runs
+ int currentOffset = 0;
+ int currentLength = 0;
+ for (int i = 0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ int nextLength = (int) vec.lengths[offset + i];
+ int nextOffset = (int) vec.offsets[offset + i];
+ lengths.write(nextLength);
+ if (currentLength == 0) {
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else if (currentOffset + currentLength != nextOffset) {
+ childWriter.writeBatch(vec.child, currentOffset,
+ currentLength);
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else {
+ currentLength += nextLength;
+ }
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(nextLength);
+ }
+ bloomFilterUtf8.addLong(nextLength);
+ }
+ }
+ }
+ if (currentLength != 0) {
+ childWriter.writeBatch(vec.child, currentOffset,
+ currentLength);
+ }
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ lengths.flush();
+ childWriter.writeStripe(builder, stats, requiredIndexEntries);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ lengths.getPosition(recorder);
+ }
+
+ @Override
+ public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+ super.updateFileStatistics(stats);
+ childWriter.updateFileStatistics(stats);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + lengths.estimateMemory() +
+ childWriter.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ return childWriter.getRawDataSize();
+ }
+
+ @Override
+ public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+ super.writeFileStatistics(footer);
+ childWriter.writeFileStatistics(footer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java
new file mode 100644
index 0000000..26ace05
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/MapTreeWriter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+
+import java.io.IOException;
+import java.util.List;
+
+public class MapTreeWriter extends TreeWriterBase {
+ private final IntegerWriter lengths;
+ private final boolean isDirectV2;
+ private final TreeWriter keyWriter;
+ private final TreeWriter valueWriter;
+
+ MapTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ List<TypeDescription> children = schema.getChildren();
+ keyWriter = Factory.create(children.get(0), writer, true);
+ valueWriter = Factory.create(children.get(1), writer, true);
+ lengths = createIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ OrcProto.ColumnEncoding.Builder getEncoding() {
+ OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+ if (isDirectV2) {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+ } else {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+ }
+ return result;
+ }
+
+ @Override
+ public void createRowIndexEntry() throws IOException {
+ super.createRowIndexEntry();
+ keyWriter.createRowIndexEntry();
+ valueWriter.createRowIndexEntry();
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ MapColumnVector vec = (MapColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int childOffset = (int) vec.offsets[0];
+ int childLength = (int) vec.lengths[0];
+ for (int i = 0; i < length; ++i) {
+ lengths.write(childLength);
+ keyWriter.writeBatch(vec.keys, childOffset, childLength);
+ valueWriter.writeBatch(vec.values, childOffset, childLength);
+ }
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(childLength);
+ }
+ bloomFilterUtf8.addLong(childLength);
+ }
+ }
+ } else {
+ // write the elements in runs
+ int currentOffset = 0;
+ int currentLength = 0;
+ for (int i = 0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ int nextLength = (int) vec.lengths[offset + i];
+ int nextOffset = (int) vec.offsets[offset + i];
+ lengths.write(nextLength);
+ if (currentLength == 0) {
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else if (currentOffset + currentLength != nextOffset) {
+ keyWriter.writeBatch(vec.keys, currentOffset,
+ currentLength);
+ valueWriter.writeBatch(vec.values, currentOffset,
+ currentLength);
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else {
+ currentLength += nextLength;
+ }
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(nextLength);
+ }
+ bloomFilterUtf8.addLong(nextLength);
+ }
+ }
+ }
+ if (currentLength != 0) {
+ keyWriter.writeBatch(vec.keys, currentOffset,
+ currentLength);
+ valueWriter.writeBatch(vec.values, currentOffset,
+ currentLength);
+ }
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ lengths.flush();
+ keyWriter.writeStripe(builder, stats, requiredIndexEntries);
+ valueWriter.writeStripe(builder, stats, requiredIndexEntries);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ lengths.getPosition(recorder);
+ }
+
+ @Override
+ public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+ super.updateFileStatistics(stats);
+ keyWriter.updateFileStatistics(stats);
+ valueWriter.updateFileStatistics(stats);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + lengths.estimateMemory() +
+ keyWriter.estimateMemory() + valueWriter.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ return keyWriter.getRawDataSize() + valueWriter.getRawDataSize();
+ }
+
+ @Override
+ public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+ super.writeFileStatistics(footer);
+ keyWriter.writeFileStatistics(footer);
+ valueWriter.writeFileStatistics(footer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
new file mode 100644
index 0000000..f49cb7f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.io.Text;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.DynamicIntArray;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.PositionedOutputStream;
+import org.apache.orc.impl.StringRedBlackTree;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class StringBaseTreeWriter extends TreeWriterBase {
+ private static final int INITIAL_DICTIONARY_SIZE = 4096;
+ private final OutStream stringOutput;
+ protected final IntegerWriter lengthOutput;
+ private final IntegerWriter rowOutput;
+ protected final StringRedBlackTree dictionary =
+ new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
+ protected final DynamicIntArray rows = new DynamicIntArray();
+ protected final PositionedOutputStream directStreamOutput;
+ private final List<OrcProto.RowIndexEntry> savedRowIndex =
+ new ArrayList<>();
+ private final boolean buildIndex;
+ private final List<Long> rowIndexValueCount = new ArrayList<>();
+ // If the number of keys in a dictionary is greater than this fraction of
+ //the total number of non-null rows, turn off dictionary encoding
+ private final double dictionaryKeySizeThreshold;
+ protected boolean useDictionaryEncoding = true;
+ private boolean isDirectV2 = true;
+ private boolean doneDictionaryCheck;
+ private final boolean strideDictionaryCheck;
+
+ StringBaseTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+ stringOutput = writer.createStream(id,
+ OrcProto.Stream.Kind.DICTIONARY_DATA);
+ lengthOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ rowOutput = createIntegerWriter(directStreamOutput, false, isDirectV2,
+ writer);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ rowIndexValueCount.add(0L);
+ buildIndex = writer.buildIndex();
+ Configuration conf = writer.getConfiguration();
+ dictionaryKeySizeThreshold =
+ OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
+ strideDictionaryCheck =
+ OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
+ doneDictionaryCheck = false;
+ }
+
+ private void checkDictionaryEncoding() {
+ if (!doneDictionaryCheck) {
+ // Set the flag indicating whether or not to use dictionary encoding
+ // based on whether or not the fraction of distinct keys over number of
+ // non-null rows is less than the configured threshold
+ float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
+ useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
+ doneDictionaryCheck = true;
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
+ // checking would not have happened. So do it again here.
+ checkDictionaryEncoding();
+
+ if (useDictionaryEncoding) {
+ flushDictionary();
+ } else {
+ // flushout any left over entries from dictionary
+ if (rows.size() > 0) {
+ flushDictionary();
+ }
+
+ // suppress the stream for every stripe if dictionary is disabled
+ stringOutput.suppress();
+ }
+
+ // we need to build the rowindex before calling super, since it
+ // writes it out.
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ if (useDictionaryEncoding) {
+ stringOutput.flush();
+ lengthOutput.flush();
+ rowOutput.flush();
+ } else {
+ directStreamOutput.flush();
+ lengthOutput.flush();
+ }
+ // reset all of the fields to be ready for the next stripe.
+ dictionary.clear();
+ savedRowIndex.clear();
+ rowIndexValueCount.clear();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ rowIndexValueCount.add(0L);
+
+ if (!useDictionaryEncoding) {
+ // record the start positions of first index stride of next stripe i.e
+ // beginning of the direct streams when dictionary is disabled
+ recordDirectStreamPosition();
+ }
+ }
+
+ private void flushDictionary() throws IOException {
+ final int[] dumpOrder = new int[dictionary.size()];
+
+ if (useDictionaryEncoding) {
+ // Write the dictionary by traversing the red-black tree writing out
+ // the bytes and lengths; and creating the map from the original order
+ // to the final sorted order.
+
+ dictionary.visit(new StringRedBlackTree.Visitor() {
+ private int currentId = 0;
+
+ @Override
+ public void visit(StringRedBlackTree.VisitorContext context
+ ) throws IOException {
+ context.writeBytes(stringOutput);
+ lengthOutput.write(context.getLength());
+ dumpOrder[context.getOriginalPosition()] = currentId++;
+ }
+ });
+ } else {
+ // for direct encoding, we don't want the dictionary data stream
+ stringOutput.suppress();
+ }
+ int length = rows.size();
+ int rowIndexEntry = 0;
+ OrcProto.RowIndex.Builder rowIndex = getRowIndex();
+ Text text = new Text();
+ // write the values translated into the dump order.
+ for (int i = 0; i <= length; ++i) {
+ // now that we are writing out the row values, we can finalize the
+ // row index
+ if (buildIndex) {
+ while (i == rowIndexValueCount.get(rowIndexEntry) &&
+ rowIndexEntry < savedRowIndex.size()) {
+ OrcProto.RowIndexEntry.Builder base =
+ savedRowIndex.get(rowIndexEntry++).toBuilder();
+ if (useDictionaryEncoding) {
+ rowOutput.getPosition(new RowIndexPositionRecorder(base));
+ } else {
+ PositionRecorder posn = new RowIndexPositionRecorder(base);
+ directStreamOutput.getPosition(posn);
+ lengthOutput.getPosition(posn);
+ }
+ rowIndex.addEntry(base.build());
+ }
+ }
+ if (i != length) {
+ if (useDictionaryEncoding) {
+ rowOutput.write(dumpOrder[rows.get(i)]);
+ } else {
+ dictionary.getText(text, rows.get(i));
+ directStreamOutput.write(text.getBytes(), 0, text.getLength());
+ lengthOutput.write(text.getLength());
+ }
+ }
+ }
+ rows.clear();
+ }
+
+ @Override
+ OrcProto.ColumnEncoding.Builder getEncoding() {
+ OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+ if (useDictionaryEncoding) {
+ result.setDictionarySize(dictionary.size());
+ if (isDirectV2) {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2);
+ } else {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY);
+ }
+ } else {
+ if (isDirectV2) {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+ } else {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * This method doesn't call the super method, because unlike most of the
+ * other TreeWriters, this one can't record the position in the streams
+ * until the stripe is being flushed. Therefore it saves all of the entries
+ * and augments them with the final information as the stripe is written.
+ */
+ @Override
+ public void createRowIndexEntry() throws IOException {
+ getStripeStatistics().merge(indexStatistics);
+ OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
+ rowIndexEntry.setStatistics(indexStatistics.serialize());
+ indexStatistics.reset();
+ OrcProto.RowIndexEntry base = rowIndexEntry.build();
+ savedRowIndex.add(base);
+ rowIndexEntry.clear();
+ addBloomFilterEntry();
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add((long) rows.size());
+ if (strideDictionaryCheck) {
+ checkDictionaryEncoding();
+ }
+ if (!useDictionaryEncoding) {
+ if (rows.size() > 0) {
+ flushDictionary();
+ // just record the start positions of next index stride
+ recordDirectStreamPosition();
+ } else {
+ // record the start positions of next index stride
+ recordDirectStreamPosition();
+ getRowIndex().addEntry(base);
+ }
+ }
+ }
+
+ private void recordDirectStreamPosition() throws IOException {
+ if (rowIndexPosition != null) {
+ directStreamOutput.getPosition(rowIndexPosition);
+ lengthOutput.getPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ public long estimateMemory() {
+ long parent = super.estimateMemory();
+ if (useDictionaryEncoding) {
+ return parent + dictionary.getSizeInBytes() + rows.getSizeInBytes();
+ } else {
+ return parent + lengthOutput.estimateMemory() +
+ directStreamOutput.getBufferSize();
+ }
+ }
+
+ @Override
+ public long getRawDataSize() {
+ // ORC strings are converted to java Strings. so use JavaDataModel to
+ // compute the overall size of strings
+ StringColumnStatistics scs = (StringColumnStatistics) fileStatistics;
+ long numVals = fileStatistics.getNumberOfValues();
+ if (numVals == 0) {
+ return 0;
+ } else {
+ int avgSize = (int) (scs.getSum() / numVals);
+ return numVals * JavaDataModel.get().lengthForStringOfLength(avgSize);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java
new file mode 100644
index 0000000..ab6f38f
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StringTreeWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class StringTreeWriter extends StringBaseTreeWriter {
+ StringTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
+ for (int i = 0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ directStreamOutput.write(vec.vector[0], vec.start[0],
+ vec.length[0]);
+ lengthOutput.write(vec.length[0]);
+ }
+ }
+ indexStatistics.updateString(vec.vector[0], vec.start[0],
+ vec.length[0], length);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[0], vec.start[0],
+ vec.length[0], StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]));
+ } else {
+ directStreamOutput.write(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ lengthOutput.write(vec.length[offset + i]);
+ }
+ indexStatistics.updateString(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i], 1);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ // translate from UTF-8 to the default charset
+ bloomFilter.addString(new String(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i],
+ StandardCharsets.UTF_8));
+ }
+ bloomFilterUtf8.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java
new file mode 100644
index 0000000..9a1384d
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/StructTreeWriter.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StructTreeWriter extends TreeWriterBase {
+ final TreeWriter[] childrenWriters;
+
+ public StructTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ List<TypeDescription> children = schema.getChildren();
+ childrenWriters = new TreeWriterBase[children.size()];
+ for (int i = 0; i < childrenWriters.length; ++i) {
+ childrenWriters[i] = Factory.create(children.get(i), writer, true);
+ }
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ public void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException {
+ // update the statistics for the root column
+ indexStatistics.increment(length);
+ // I'm assuming that the root column isn't nullable so that I don't need
+ // to update isPresent.
+ for (int i = 0; i < childrenWriters.length; ++i) {
+ childrenWriters[i].writeBatch(batch.cols[i], offset, length);
+ }
+ }
+
+ private static void writeFields(StructColumnVector vector,
+ TreeWriter[] childrenWriters,
+ int offset, int length) throws IOException {
+ for (int field = 0; field < childrenWriters.length; ++field) {
+ childrenWriters[field].writeBatch(vector.fields[field], offset, length);
+ }
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ StructColumnVector vec = (StructColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ writeFields(vec, childrenWriters, offset, length);
+ }
+ } else if (vector.noNulls) {
+ writeFields(vec, childrenWriters, offset, length);
+ } else {
+ // write the records in runs
+ int currentRun = 0;
+ boolean started = false;
+ for (int i = 0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ if (!started) {
+ started = true;
+ currentRun = i;
+ }
+ } else if (started) {
+ started = false;
+ writeFields(vec, childrenWriters, offset + currentRun,
+ i - currentRun);
+ }
+ }
+ if (started) {
+ writeFields(vec, childrenWriters, offset + currentRun,
+ length - currentRun);
+ }
+ }
+ }
+
+ @Override
+ public void createRowIndexEntry() throws IOException {
+ super.createRowIndexEntry();
+ for (TreeWriter child : childrenWriters) {
+ child.createRowIndexEntry();
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ for (TreeWriter child : childrenWriters) {
+ child.writeStripe(builder, stats, requiredIndexEntries);
+ }
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+ super.updateFileStatistics(stats);
+ for (TreeWriter child : childrenWriters) {
+ child.updateFileStatistics(stats);
+ }
+ }
+
+ @Override
+ public long estimateMemory() {
+ long result = 0;
+ for (TreeWriter writer : childrenWriters) {
+ result += writer.estimateMemory();
+ }
+ return super.estimateMemory() + result;
+ }
+
+ @Override
+ public long getRawDataSize() {
+ long result = 0;
+ for (TreeWriter writer : childrenWriters) {
+ result += writer.getRawDataSize();
+ }
+ return result;
+ }
+
+ @Override
+ public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+ super.writeFileStatistics(footer);
+ for (TreeWriter child : childrenWriters) {
+ child.writeFileStatistics(footer);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
new file mode 100644
index 0000000..fae108e
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/TimestampTreeWriter.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.IntegerWriter;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.SerializationUtils;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.TimeZone;
+
+public class TimestampTreeWriter extends TreeWriterBase {
+ public static final int MILLIS_PER_SECOND = 1000;
+ public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
+
+ private final IntegerWriter seconds;
+ private final IntegerWriter nanos;
+ private final boolean isDirectV2;
+ private final TimeZone localTimezone;
+ private final long baseEpochSecsLocalTz;
+
+ public TimestampTreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.seconds = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
+ this.nanos = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ this.localTimezone = TimeZone.getDefault();
+ // for unit tests to set different time zones
+ this.baseEpochSecsLocalTz = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
+ }
+
+ @Override
+ OrcProto.ColumnEncoding.Builder getEncoding() {
+ OrcProto.ColumnEncoding.Builder result = super.getEncoding();
+ if (isDirectV2) {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
+ } else {
+ result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
+ }
+ return result;
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ TimestampColumnVector vec = (TimestampColumnVector) vector;
+ Timestamp val;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ val = vec.asScratchTimestamp(0);
+ long millis = val.getTime();
+ long utc = SerializationUtils.convertToUtc(localTimezone, millis);
+ indexStatistics.updateTimestamp(utc);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(millis);
+ }
+ bloomFilterUtf8.addLong(utc);
+ }
+ final long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz;
+ final long nano = formatNanos(val.getNanos());
+ for (int i = 0; i < length; ++i) {
+ seconds.write(secs);
+ nanos.write(nano);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ val = vec.asScratchTimestamp(i + offset);
+ long millis = val.getTime();
+ long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz;
+ long utc = SerializationUtils.convertToUtc(localTimezone, millis);
+ seconds.write(secs);
+ nanos.write(formatNanos(val.getNanos()));
+ indexStatistics.updateTimestamp(utc);
+ if (createBloomFilter) {
+ if (bloomFilter != null) {
+ bloomFilter.addLong(millis);
+ }
+ bloomFilterUtf8.addLong(utc);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ seconds.flush();
+ nanos.flush();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ private static long formatNanos(int nanos) {
+ if (nanos == 0) {
+ return 0;
+ } else if (nanos % 100 != 0) {
+ return ((long) nanos) << 3;
+ } else {
+ nanos /= 100;
+ int trailingZeros = 1;
+ while (nanos % 10 == 0 && trailingZeros < 7) {
+ nanos /= 10;
+ trailingZeros += 1;
+ }
+ return ((long) nanos) << 3 | trailingZeros;
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ seconds.getPosition(recorder);
+ nanos.getPosition(recorder);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + seconds.estimateMemory() +
+ nanos.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ return fileStatistics.getNumberOfValues() *
+ JavaDataModel.get().lengthOfTimestamp();
+ }
+}
[3/4] orc git commit: ORC-194. Split TreeWriters out of WriterImpl.
Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/orc/blob/ded204a4/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index cfdddad..a5d65dd 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -20,10 +20,7 @@ package org.apache.orc.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.sql.Timestamp;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -34,43 +31,25 @@ import io.airlift.compress.lz4.Lz4Compressor;
import io.airlift.compress.lz4.Lz4Decompressor;
import io.airlift.compress.lzo.LzoCompressor;
import io.airlift.compress.lzo.LzoDecompressor;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.orc.BinaryColumnStatistics;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.CompressionCodec;
import org.apache.orc.CompressionKind;
import org.apache.orc.MemoryManager;
-import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.OrcUtils;
import org.apache.orc.PhysicalWriter;
-import org.apache.orc.StringColumnStatistics;
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
-import org.apache.orc.util.BloomFilter;
-import org.apache.orc.util.BloomFilterIO;
-import org.apache.orc.util.BloomFilterUtf8;
+import org.apache.orc.impl.writer.TreeWriter;
+import org.apache.orc.impl.writer.WriterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.io.Text;
import com.google.protobuf.ByteString;
@@ -108,7 +87,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private final PhysicalWriter physicalWriter;
private final OrcFile.WriterVersion writerVersion;
- private int columnCount;
private long rowCount = 0;
private long rowsInStripe = 0;
private long rawDataSize = 0;
@@ -133,7 +111,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private final boolean[] bloomFilterColumns;
private final double bloomFilterFpp;
private final OrcFile.BloomFilterVersion bloomFilterVersion;
- private boolean writeTimeZone;
+ private final boolean writeTimeZone;
public WriterImpl(FileSystem fs,
Path path,
@@ -155,6 +133,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
} else {
callbackContext = null;
}
+ writeTimeZone = hasTimestamp(schema);
this.adjustedStripeSize = opts.getStripeSize();
this.version = opts.getVersion();
this.encodingStrategy = opts.getEncodingStrategy();
@@ -181,7 +160,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
this.physicalWriter = opts.getPhysicalWriter() == null ?
new PhysicalFsWriter(fs, path, opts) : opts.getPhysicalWriter();
physicalWriter.writeHeader();
- treeWriter = createTreeWriter(schema, new StreamFactory(), false);
+ treeWriter = TreeWriter.Factory.create(schema, new StreamFactory(), false);
if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
throw new IllegalArgumentException("Row stride must be at least " +
MIN_ROW_INDEX_STRIDE);
@@ -278,2524 +257,142 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
return false;
}
- private static class RowIndexPositionRecorder implements PositionRecorder {
- private final OrcProto.RowIndexEntry.Builder builder;
- RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
- this.builder = builder;
- }
-
- @Override
- public void addPosition(long position) {
- builder.addPositions(position);
- }
- }
-
- CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
- // TODO: modify may create a new codec here. We want to end() it when the stream is closed,
- // but at this point there's no close() for the stream.
- CompressionCodec result = physicalWriter.getCompressionCodec();
- if (result != null) {
- switch (kind) {
- case BLOOM_FILTER:
- case DATA:
- case DICTIONARY_DATA:
- case BLOOM_FILTER_UTF8:
- if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
- result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
- CompressionCodec.Modifier.TEXT));
- } else {
- result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
- CompressionCodec.Modifier.TEXT));
- }
- break;
- case LENGTH:
- case DICTIONARY_COUNT:
- case PRESENT:
- case ROW_INDEX:
- case SECONDARY:
- // easily compressed using the fastest modes
- result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
- CompressionCodec.Modifier.BINARY));
- break;
- default:
- LOG.info("Missing ORC compression modifiers for " + kind);
- break;
- }
- }
- return result;
- }
-
- /**
- * Interface from the Writer to the TreeWriters. This limits the visibility
- * that the TreeWriters have into the Writer.
- */
- private class StreamFactory {
- /**
- * Create a stream to store part of a column.
- * @param column the column id for the stream
- * @param kind the kind of stream
- * @return The output outStream that the section needs to be written to.
- */
- public OutStream createStream(int column,
- OrcProto.Stream.Kind kind
- ) throws IOException {
- final StreamName name = new StreamName(column, kind);
- CompressionCodec codec = getCustomizedCodec(kind);
-
- return new OutStream(physicalWriter.toString(), bufferSize, codec,
- physicalWriter.createDataStream(name));
- }
-
- /**
- * Get the next column id.
- * @return a number from 0 to the number of columns - 1
- */
- public int getNextColumnId() {
- return columnCount++;
- }
-
- /**
- * Get the stride rate of the row index.
- */
- public int getRowIndexStride() {
- return rowIndexStride;
- }
-
- /**
- * Should be building the row index.
- * @return true if we are building the index
- */
- public boolean buildIndex() {
- return buildIndex;
- }
-
- /**
- * Is the ORC file compressed?
- * @return are the streams compressed
- */
- public boolean isCompressed() {
- return physicalWriter.getCompressionCodec() != null;
- }
-
- /**
- * Get the encoding strategy to use.
- * @return encoding strategy
- */
- public OrcFile.EncodingStrategy getEncodingStrategy() {
- return encodingStrategy;
- }
-
- /**
- * Get the bloom filter columns
- * @return bloom filter columns
- */
- public boolean[] getBloomFilterColumns() {
- return bloomFilterColumns;
- }
-
- /**
- * Get bloom filter false positive percentage.
- * @return fpp
- */
- public double getBloomFilterFPP() {
- return bloomFilterFpp;
- }
-
- /**
- * Get the writer's configuration.
- * @return configuration
- */
- public Configuration getConfiguration() {
- return conf;
- }
-
- /**
- * Get the version of the file to write.
- */
- public OrcFile.Version getVersion() {
- return version;
- }
-
- public void useWriterTimeZone(boolean val) {
- writeTimeZone = val;
- }
-
- public boolean hasWriterTimeZone() {
- return writeTimeZone;
- }
-
- public OrcFile.BloomFilterVersion getBloomFilterVersion() {
- return bloomFilterVersion;
- }
-
- public void writeIndex(StreamName name,
- OrcProto.RowIndex.Builder index) throws IOException {
- physicalWriter.writeIndex(name, index, getCustomizedCodec(name.getKind()));
- }
-
- public void writeBloomFilter(StreamName name,
- OrcProto.BloomFilterIndex.Builder bloom
- ) throws IOException {
- physicalWriter.writeBloomFilter(name, bloom,
- getCustomizedCodec(name.getKind()));
- }
- }
-
- /**
- * The writers for the specific writers of each type. This provides
- * the generic API that they must all implement.
- */
- interface TreeWriter {
-
- /**
- * Estimate the memory currently used to buffer the stripe.
- * @return the number of bytes
- */
- long estimateMemory();
-
- /**
- * Estimate the memory used if the file was read into Hive's Writable
- * types. This is used as an estimate for the query optimizer.
- * @return the number of bytes
- */
- long getRawDataSize();
-
- /**
- * Write a VectorizedRowBath to the file. This is called by the WriterImpl
- * at the top level.
- * @param batch the list of all of the columns
- * @param offset the first row from the batch to write
- * @param length the number of rows to write
- */
- void writeRootBatch(VectorizedRowBatch batch, int offset,
- int length) throws IOException;
-
- /**
- * Write a ColumnVector to the file. This is called recursively by
- * writeRootBatch.
- * @param vector the data to write
- * @param offset the first value offset to write.
- * @param length the number of values to write
- */
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException;
-
- /**
- * Create a row index entry at the current point in the stripe.
- */
- void createRowIndexEntry() throws IOException;
-
- /**
- * Write the stripe out to the file.
- * @param stripeFooter the stripe footer that contains the information about the
- * layout of the stripe. The TreeWriterBase is required to update
- * the footer with its information.
- * @param stats the stripe statistics information
- * @param requiredIndexEntries the number of index entries that are
- * required. this is to check to make sure the
- * row index is well formed.
- */
- void writeStripe(OrcProto.StripeFooter.Builder stripeFooter,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException;
-
- /**
- * During a stripe append, we need to update the file statistics.
- * @param stripeStatistics the statistics for the new stripe
- */
- void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics);
-
- /**
- * Add the file statistics to the file footer.
- * @param footer the file footer builder
- */
- void writeFileStatistics(OrcProto.Footer.Builder footer);
- }
-
- /**
- * The parent class of all of the writers for each column. Each column
- * is written by an instance of this class. The compound types (struct,
- * list, map, and union) have children tree writers that write the children
- * types.
- */
- private abstract static class TreeWriterBase implements TreeWriter {
- protected final int id;
- protected final BitFieldWriter isPresent;
- private final boolean isCompressed;
- protected final ColumnStatisticsImpl indexStatistics;
- protected final ColumnStatisticsImpl stripeColStatistics;
- protected final ColumnStatisticsImpl fileStatistics;
- protected final RowIndexPositionRecorder rowIndexPosition;
- private final OrcProto.RowIndex.Builder rowIndex;
- private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
- protected final BloomFilter bloomFilter;
- protected final BloomFilterUtf8 bloomFilterUtf8;
- protected final boolean createBloomFilter;
- private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
- private final OrcProto.BloomFilterIndex.Builder bloomFilterIndexUtf8;
- protected final OrcProto.BloomFilter.Builder bloomFilterEntry;
- private boolean foundNulls;
- private OutStream isPresentOutStream;
- private final StreamFactory streamFactory;
-
- /**
- * Create a tree writer.
- * @param columnId the column id of the column to write
- * @param schema the row schema
- * @param streamFactory limited access to the Writer's data.
- * @param nullable can the value be null?
- */
- TreeWriterBase(int columnId,
- TypeDescription schema,
- StreamFactory streamFactory,
- boolean nullable) throws IOException {
- this.streamFactory = streamFactory;
- this.isCompressed = streamFactory.isCompressed();
- this.id = columnId;
- if (nullable) {
- isPresentOutStream = streamFactory.createStream(id,
- OrcProto.Stream.Kind.PRESENT);
- isPresent = new BitFieldWriter(isPresentOutStream, 1);
- } else {
- isPresent = null;
- }
- this.foundNulls = false;
- createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
- indexStatistics = ColumnStatisticsImpl.create(schema);
- stripeColStatistics = ColumnStatisticsImpl.create(schema);
- fileStatistics = ColumnStatisticsImpl.create(schema);
- if (streamFactory.buildIndex()) {
- rowIndex = OrcProto.RowIndex.newBuilder();
- rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
- rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
- } else {
- rowIndex = null;
- rowIndexEntry = null;
- rowIndexPosition = null;
- }
- if (createBloomFilter) {
- bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
- if (streamFactory.getBloomFilterVersion() == OrcFile.BloomFilterVersion.ORIGINAL) {
- bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(),
- streamFactory.getBloomFilterFPP());
- bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
- } else {
- bloomFilter = null;
- bloomFilterIndex = null;
- }
- bloomFilterUtf8 = new BloomFilterUtf8(streamFactory.getRowIndexStride(),
- streamFactory.getBloomFilterFPP());
- bloomFilterIndexUtf8 = OrcProto.BloomFilterIndex.newBuilder();
- } else {
- bloomFilterEntry = null;
- bloomFilterIndex = null;
- bloomFilterIndexUtf8 = null;
- bloomFilter = null;
- bloomFilterUtf8 = null;
- }
- }
-
- protected OrcProto.RowIndex.Builder getRowIndex() {
- return rowIndex;
- }
-
- protected ColumnStatisticsImpl getStripeStatistics() {
- return stripeColStatistics;
- }
-
- protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
- return rowIndexEntry;
- }
-
- IntegerWriter createIntegerWriter(PositionedOutputStream output,
- boolean signed, boolean isDirectV2,
- StreamFactory writer) {
- if (isDirectV2) {
- boolean alignedBitpacking = false;
- if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
- alignedBitpacking = true;
- }
- return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
- } else {
- return new RunLengthIntegerWriter(output, signed);
- }
- }
-
- boolean isNewWriteFormat(StreamFactory writer) {
- return writer.getVersion() != OrcFile.Version.V_0_11;
- }
-
- /**
- * Handle the top level object write.
- *
- * This default method is used for all types except structs, which are the
- * typical case. VectorizedRowBatch assumes the top level object is a
- * struct, so we use the first column for all other types.
- * @param batch the batch to write from
- * @param offset the row to start on
- * @param length the number of rows to write
- */
- public void writeRootBatch(VectorizedRowBatch batch, int offset,
- int length) throws IOException {
- writeBatch(batch.cols[0], offset, length);
- }
-
- /**
- * Write the values from the given vector from offset for length elements.
- * @param vector the vector to write from
- * @param offset the first value from the vector to write
- * @param length the number of values from the vector to write
- */
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- if (vector.noNulls) {
- indexStatistics.increment(length);
- if (isPresent != null) {
- for (int i = 0; i < length; ++i) {
- isPresent.write(1);
- }
- }
- } else {
- if (vector.isRepeating) {
- boolean isNull = vector.isNull[0];
- if (isPresent != null) {
- for (int i = 0; i < length; ++i) {
- isPresent.write(isNull ? 0 : 1);
- }
- }
- if (isNull) {
- foundNulls = true;
- indexStatistics.setNull();
- } else {
- indexStatistics.increment(length);
- }
- } else {
- // count the number of non-null values
- int nonNullCount = 0;
- for(int i = 0; i < length; ++i) {
- boolean isNull = vector.isNull[i + offset];
- if (!isNull) {
- nonNullCount += 1;
- }
- if (isPresent != null) {
- isPresent.write(isNull ? 0 : 1);
- }
- }
- indexStatistics.increment(nonNullCount);
- if (nonNullCount != length) {
- foundNulls = true;
- indexStatistics.setNull();
- }
- }
- }
- }
-
- private void removeIsPresentPositions() {
- for(int i=0; i < rowIndex.getEntryCount(); ++i) {
- OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
- List<Long> positions = entry.getPositionsList();
- // bit streams use 3 positions if uncompressed, 4 if compressed
- positions = positions.subList(isCompressed ? 4 : 3, positions.size());
- entry.clearPositions();
- entry.addAllPositions(positions);
- }
- }
-
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- if (isPresent != null) {
- isPresent.flush();
-
- // if no nulls are found in a stream, then suppress the stream
- if(!foundNulls) {
- isPresentOutStream.suppress();
- // since isPresent bitstream is suppressed, update the index to
- // remove the positions of the isPresent stream
- if (rowIndex != null) {
- removeIsPresentPositions();
- }
- }
- }
-
- // merge stripe-level column statistics to file statistics and write it to
- // stripe statistics
- fileStatistics.merge(stripeColStatistics);
- stats.addColStats(stripeColStatistics.serialize());
- stripeColStatistics.reset();
-
- // reset the flag for next stripe
- foundNulls = false;
-
- builder.addColumns(getEncoding());
- if (streamFactory.hasWriterTimeZone()) {
- builder.setWriterTimezone(TimeZone.getDefault().getID());
- }
- if (rowIndex != null) {
- if (rowIndex.getEntryCount() != requiredIndexEntries) {
- throw new IllegalArgumentException("Column has wrong number of " +
- "index entries found: " + rowIndex.getEntryCount() + " expected: " +
- requiredIndexEntries);
- }
- streamFactory.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex);
- rowIndex.clear();
- rowIndexEntry.clear();
- }
-
- // write the bloom filter to out stream
- if (bloomFilterIndex != null) {
- streamFactory.writeBloomFilter(new StreamName(id,
- OrcProto.Stream.Kind.BLOOM_FILTER), bloomFilterIndex);
- bloomFilterIndex.clear();
- }
- // write the bloom filter to out stream
- if (bloomFilterIndexUtf8 != null) {
- streamFactory.writeBloomFilter(new StreamName(id,
- OrcProto.Stream.Kind.BLOOM_FILTER_UTF8), bloomFilterIndexUtf8);
- bloomFilterIndexUtf8.clear();
- }
- }
-
- /**
- * Get the encoding for this column.
- * @return the information about the encoding of this column
- */
- OrcProto.ColumnEncoding.Builder getEncoding() {
- OrcProto.ColumnEncoding.Builder builder =
- OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
- if (createBloomFilter) {
- builder.setBloomEncoding(BloomFilterIO.Encoding.CURRENT.getId());
- }
- return builder;
- }
-
- /**
- * Create a row index entry with the previous location and the current
- * index statistics. Also merges the index statistics into the file
- * statistics before they are cleared. Finally, it records the start of the
- * next index and ensures all of the children columns also create an entry.
- */
- public void createRowIndexEntry() throws IOException {
- stripeColStatistics.merge(indexStatistics);
- rowIndexEntry.setStatistics(indexStatistics.serialize());
- indexStatistics.reset();
- rowIndex.addEntry(rowIndexEntry);
- rowIndexEntry.clear();
- addBloomFilterEntry();
- recordPosition(rowIndexPosition);
- }
-
- void addBloomFilterEntry() {
- if (createBloomFilter) {
- if (bloomFilter != null) {
- BloomFilterIO.serialize(bloomFilterEntry, bloomFilter);
- bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
- bloomFilter.reset();
- }
- if (bloomFilterUtf8 != null) {
- BloomFilterIO.serialize(bloomFilterEntry, bloomFilterUtf8);
- bloomFilterIndexUtf8.addBloomFilter(bloomFilterEntry.build());
- bloomFilterUtf8.reset();
- }
- }
- }
-
- @Override
- public void updateFileStatistics(OrcProto.StripeStatistics stats) {
- fileStatistics.merge(ColumnStatisticsImpl.deserialize(stats.getColStats(id)));
- }
-
- /**
- * Record the current position in each of this column's streams.
- * @param recorder where should the locations be recorded
- */
- void recordPosition(PositionRecorder recorder) throws IOException {
- if (isPresent != null) {
- isPresent.getPosition(recorder);
- }
- }
-
- /**
- * Estimate how much memory the writer is consuming excluding the streams.
- * @return the number of bytes.
- */
- public long estimateMemory() {
- long result = 0;
- if (isPresent != null) {
- result = isPresentOutStream.getBufferSize();
- }
- return result;
- }
-
- @Override
- public void writeFileStatistics(OrcProto.Footer.Builder footer) {
- footer.addStatistics(fileStatistics.serialize());
- }
- }
-
- private static class BooleanTreeWriter extends TreeWriterBase {
- private final BitFieldWriter writer;
-
- BooleanTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- PositionedOutputStream out = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.writer = new BitFieldWriter(out, 1);
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- int value = vec.vector[0] == 0 ? 0 : 1;
- indexStatistics.updateBoolean(value != 0, length);
- for(int i=0; i < length; ++i) {
- writer.write(value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- int value = vec.vector[i + offset] == 0 ? 0 : 1;
- writer.write(value);
- indexStatistics.updateBoolean(value != 0, 1);
- }
- }
- }
- }
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- writer.flush();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
- }
-
- @Override
- public long estimateMemory() {
- return super.estimateMemory() + writer.estimateMemory();
- }
-
- @Override
- public long getRawDataSize() {
- long num = fileStatistics.getNumberOfValues();
- return num * JavaDataModel.get().primitive1();
- }
- }
-
- private static class ByteTreeWriter extends TreeWriterBase {
- private final RunLengthByteWriter writer;
-
- ByteTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- this.writer = new RunLengthByteWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA));
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- byte value = (byte) vec.vector[0];
- indexStatistics.updateInteger(value, length);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(value);
- }
- bloomFilterUtf8.addLong(value);
- }
- for(int i=0; i < length; ++i) {
- writer.write(value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- byte value = (byte) vec.vector[i + offset];
- writer.write(value);
- indexStatistics.updateInteger(value, 1);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(value);
- }
- bloomFilterUtf8.addLong(value);
- }
- }
- }
- }
- }
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- writer.flush();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
- }
-
- @Override
- public long estimateMemory() {
- return super.estimateMemory() + writer.estimateMemory();
- }
-
- @Override
- public long getRawDataSize() {
- long num = fileStatistics.getNumberOfValues();
- return num * JavaDataModel.get().primitive1();
- }
- }
-
- private static class IntegerTreeWriter extends TreeWriterBase {
- private final IntegerWriter writer;
- private boolean isDirectV2 = true;
- private final boolean isLong;
-
- IntegerTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- OutStream out = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.writer = createIntegerWriter(out, true, isDirectV2, writer);
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- this.isLong = schema.getCategory() == TypeDescription.Category.LONG;
- }
-
- @Override
- OrcProto.ColumnEncoding.Builder getEncoding() {
- OrcProto.ColumnEncoding.Builder result = super.getEncoding();
- if (isDirectV2) {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
- } else {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
- }
- return result;
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- long value = vec.vector[0];
- indexStatistics.updateInteger(value, length);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(value);
- }
- bloomFilterUtf8.addLong(value);
- }
- for(int i=0; i < length; ++i) {
- writer.write(value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- long value = vec.vector[i + offset];
- writer.write(value);
- indexStatistics.updateInteger(value, 1);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(value);
- }
- bloomFilterUtf8.addLong(value);
- }
- }
- }
- }
- }
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- writer.flush();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
- }
-
- @Override
- public long estimateMemory() {
- return super.estimateMemory() + writer.estimateMemory();
- }
-
- @Override
- public long getRawDataSize() {
- JavaDataModel jdm = JavaDataModel.get();
- long num = fileStatistics.getNumberOfValues();
- return num * (isLong ? jdm.primitive2() : jdm.primitive1());
- }
- }
-
- private static class FloatTreeWriter extends TreeWriterBase {
- private final PositionedOutputStream stream;
- private final SerializationUtils utils;
-
- FloatTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- this.stream = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.utils = new SerializationUtils();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- DoubleColumnVector vec = (DoubleColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- float value = (float) vec.vector[0];
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addDouble(value);
- }
- bloomFilterUtf8.addDouble(value);
- }
- for(int i=0; i < length; ++i) {
- utils.writeFloat(stream, value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- float value = (float) vec.vector[i + offset];
- utils.writeFloat(stream, value);
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addDouble(value);
- }
- bloomFilterUtf8.addDouble(value);
- }
- }
- }
- }
- }
-
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- stream.flush();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- stream.getPosition(recorder);
- }
-
- @Override
- public long estimateMemory() {
- return super.estimateMemory() + stream.getBufferSize();
- }
-
- @Override
- public long getRawDataSize() {
- long num = fileStatistics.getNumberOfValues();
- return num * JavaDataModel.get().primitive1();
- }
- }
-
- private static class DoubleTreeWriter extends TreeWriterBase {
- private final PositionedOutputStream stream;
- private final SerializationUtils utils;
-
- DoubleTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- this.stream = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.utils = new SerializationUtils();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- DoubleColumnVector vec = (DoubleColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- double value = vec.vector[0];
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addDouble(value);
- }
- bloomFilterUtf8.addDouble(value);
- }
- for(int i=0; i < length; ++i) {
- utils.writeDouble(stream, value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- double value = vec.vector[i + offset];
- utils.writeDouble(stream, value);
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addDouble(value);
- }
- bloomFilterUtf8.addDouble(value);
- }
- }
- }
- }
- }
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- stream.flush();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- stream.getPosition(recorder);
- }
-
- @Override
- public long estimateMemory() {
- return super.estimateMemory() + stream.getBufferSize();
- }
-
- @Override
- public long getRawDataSize() {
- long num = fileStatistics.getNumberOfValues();
- return num * JavaDataModel.get().primitive2();
- }
- }
-
- private static abstract class StringBaseTreeWriter extends TreeWriterBase {
- private static final int INITIAL_DICTIONARY_SIZE = 4096;
- private final OutStream stringOutput;
- protected final IntegerWriter lengthOutput;
- private final IntegerWriter rowOutput;
- protected final StringRedBlackTree dictionary =
- new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
- protected final DynamicIntArray rows = new DynamicIntArray();
- protected final PositionedOutputStream directStreamOutput;
- private final List<OrcProto.RowIndexEntry> savedRowIndex =
- new ArrayList<>();
- private final boolean buildIndex;
- private final List<Long> rowIndexValueCount = new ArrayList<>();
- // If the number of keys in a dictionary is greater than this fraction of
- //the total number of non-null rows, turn off dictionary encoding
- private final double dictionaryKeySizeThreshold;
- protected boolean useDictionaryEncoding = true;
- private boolean isDirectV2 = true;
- private boolean doneDictionaryCheck;
- private final boolean strideDictionaryCheck;
-
- StringBaseTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- this.isDirectV2 = isNewWriteFormat(writer);
- directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- stringOutput = writer.createStream(id,
- OrcProto.Stream.Kind.DICTIONARY_DATA);
- lengthOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- rowOutput = createIntegerWriter(directStreamOutput, false, isDirectV2,
- writer);
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- rowIndexValueCount.add(0L);
- buildIndex = writer.buildIndex();
- Configuration conf = writer.getConfiguration();
- dictionaryKeySizeThreshold =
- OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
- strideDictionaryCheck =
- OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
- doneDictionaryCheck = false;
- }
-
- private void checkDictionaryEncoding() {
- if (!doneDictionaryCheck) {
- // Set the flag indicating whether or not to use dictionary encoding
- // based on whether or not the fraction of distinct keys over number of
- // non-null rows is less than the configured threshold
- float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
- useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
- doneDictionaryCheck = true;
- }
- }
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
- // checking would not have happened. So do it again here.
- checkDictionaryEncoding();
-
- if (useDictionaryEncoding) {
- flushDictionary();
- } else {
- // flushout any left over entries from dictionary
- if (rows.size() > 0) {
- flushDictionary();
- }
-
- // suppress the stream for every stripe if dictionary is disabled
- stringOutput.suppress();
- }
-
- // we need to build the rowindex before calling super, since it
- // writes it out.
- super.writeStripe(builder, stats, requiredIndexEntries);
- if (useDictionaryEncoding) {
- stringOutput.flush();
- lengthOutput.flush();
- rowOutput.flush();
- } else {
- directStreamOutput.flush();
- lengthOutput.flush();
- }
- // reset all of the fields to be ready for the next stripe.
- dictionary.clear();
- savedRowIndex.clear();
- rowIndexValueCount.clear();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- rowIndexValueCount.add(0L);
-
- if (!useDictionaryEncoding) {
- // record the start positions of first index stride of next stripe i.e
- // beginning of the direct streams when dictionary is disabled
- recordDirectStreamPosition();
- }
- }
-
- private void flushDictionary() throws IOException {
- final int[] dumpOrder = new int[dictionary.size()];
-
- if (useDictionaryEncoding) {
- // Write the dictionary by traversing the red-black tree writing out
- // the bytes and lengths; and creating the map from the original order
- // to the final sorted order.
-
- dictionary.visit(new StringRedBlackTree.Visitor() {
- private int currentId = 0;
- @Override
- public void visit(StringRedBlackTree.VisitorContext context
- ) throws IOException {
- context.writeBytes(stringOutput);
- lengthOutput.write(context.getLength());
- dumpOrder[context.getOriginalPosition()] = currentId++;
- }
- });
- } else {
- // for direct encoding, we don't want the dictionary data stream
- stringOutput.suppress();
- }
- int length = rows.size();
- int rowIndexEntry = 0;
- OrcProto.RowIndex.Builder rowIndex = getRowIndex();
- Text text = new Text();
- // write the values translated into the dump order.
- for(int i = 0; i <= length; ++i) {
- // now that we are writing out the row values, we can finalize the
- // row index
- if (buildIndex) {
- while (i == rowIndexValueCount.get(rowIndexEntry) &&
- rowIndexEntry < savedRowIndex.size()) {
- OrcProto.RowIndexEntry.Builder base =
- savedRowIndex.get(rowIndexEntry++).toBuilder();
- if (useDictionaryEncoding) {
- rowOutput.getPosition(new RowIndexPositionRecorder(base));
- } else {
- PositionRecorder posn = new RowIndexPositionRecorder(base);
- directStreamOutput.getPosition(posn);
- lengthOutput.getPosition(posn);
- }
- rowIndex.addEntry(base.build());
- }
- }
- if (i != length) {
- if (useDictionaryEncoding) {
- rowOutput.write(dumpOrder[rows.get(i)]);
- } else {
- dictionary.getText(text, rows.get(i));
- directStreamOutput.write(text.getBytes(), 0, text.getLength());
- lengthOutput.write(text.getLength());
- }
- }
- }
- rows.clear();
- }
-
- @Override
- OrcProto.ColumnEncoding.Builder getEncoding() {
- OrcProto.ColumnEncoding.Builder result = super.getEncoding();
- if (useDictionaryEncoding) {
- result.setDictionarySize(dictionary.size());
- if(isDirectV2) {
- result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2);
- } else {
- result.setKind(OrcProto.ColumnEncoding.Kind.DICTIONARY);
- }
- } else {
- if(isDirectV2) {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
- } else {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
- }
- }
- return result;
- }
-
- /**
- * This method doesn't call the super method, because unlike most of the
- * other TreeWriters, this one can't record the position in the streams
- * until the stripe is being flushed. Therefore it saves all of the entries
- * and augments them with the final information as the stripe is written.
- */
- @Override
- public void createRowIndexEntry() throws IOException {
- getStripeStatistics().merge(indexStatistics);
- OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
- rowIndexEntry.setStatistics(indexStatistics.serialize());
- indexStatistics.reset();
- OrcProto.RowIndexEntry base = rowIndexEntry.build();
- savedRowIndex.add(base);
- rowIndexEntry.clear();
- addBloomFilterEntry();
- recordPosition(rowIndexPosition);
- rowIndexValueCount.add((long) rows.size());
- if (strideDictionaryCheck) {
- checkDictionaryEncoding();
- }
- if (!useDictionaryEncoding) {
- if (rows.size() > 0) {
- flushDictionary();
- // just record the start positions of next index stride
- recordDirectStreamPosition();
- } else {
- // record the start positions of next index stride
- recordDirectStreamPosition();
- getRowIndex().addEntry(base);
- }
- }
- }
-
- private void recordDirectStreamPosition() throws IOException {
- if (rowIndexPosition != null) {
- directStreamOutput.getPosition(rowIndexPosition);
- lengthOutput.getPosition(rowIndexPosition);
- }
- }
-
- @Override
- public long estimateMemory() {
- long parent = super.estimateMemory();
- if (useDictionaryEncoding) {
- return parent + dictionary.getSizeInBytes() + rows.getSizeInBytes();
- } else {
- return parent + lengthOutput.estimateMemory() +
- directStreamOutput.getBufferSize();
- }
- }
-
- @Override
- public long getRawDataSize() {
- // ORC strings are converted to java Strings. so use JavaDataModel to
- // compute the overall size of strings
- StringColumnStatistics scs = (StringColumnStatistics) fileStatistics;
- long numVals = fileStatistics.getNumberOfValues();
- if (numVals == 0) {
- return 0;
- } else {
- int avgSize = (int) (scs.getSum() / numVals);
- return numVals * JavaDataModel.get().lengthForStringOfLength(avgSize);
- }
- }
- }
-
- private static class StringTreeWriter extends StringBaseTreeWriter {
- StringTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- if (useDictionaryEncoding) {
- int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
- for(int i=0; i < length; ++i) {
- rows.add(id);
- }
- } else {
- for(int i=0; i < length; ++i) {
- directStreamOutput.write(vec.vector[0], vec.start[0],
- vec.length[0]);
- lengthOutput.write(vec.length[0]);
- }
- }
- indexStatistics.updateString(vec.vector[0], vec.start[0],
- vec.length[0], length);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- // translate from UTF-8 to the default charset
- bloomFilter.addString(new String(vec.vector[0], vec.start[0],
- vec.length[0], StandardCharsets.UTF_8));
- }
- bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- if (useDictionaryEncoding) {
- rows.add(dictionary.add(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]));
- } else {
- directStreamOutput.write(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- lengthOutput.write(vec.length[offset + i]);
- }
- indexStatistics.updateString(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i], 1);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- // translate from UTF-8 to the default charset
- bloomFilter.addString(new String(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i],
- StandardCharsets.UTF_8));
- }
- bloomFilterUtf8.addBytes(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- }
- }
- }
- }
- }
- }
-
- /**
- * Under the covers, char is written to ORC the same way as string.
- */
- private static class CharTreeWriter extends StringBaseTreeWriter {
- private final int itemLength;
- private final byte[] padding;
-
- CharTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- itemLength = schema.getMaxLength();
- padding = new byte[itemLength];
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- byte[] ptr;
- int ptrOffset;
- if (vec.length[0] >= itemLength) {
- ptr = vec.vector[0];
- ptrOffset = vec.start[0];
- } else {
- ptr = padding;
- ptrOffset = 0;
- System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
- vec.length[0]);
- Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
- }
- if (useDictionaryEncoding) {
- int id = dictionary.add(ptr, ptrOffset, itemLength);
- for(int i=0; i < length; ++i) {
- rows.add(id);
- }
- } else {
- for(int i=0; i < length; ++i) {
- directStreamOutput.write(ptr, ptrOffset, itemLength);
- lengthOutput.write(itemLength);
- }
- }
- indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- // translate from UTF-8 to the default charset
- bloomFilter.addString(new String(vec.vector[0], vec.start[0],
- vec.length[0], StandardCharsets.UTF_8));
- }
- bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- byte[] ptr;
- int ptrOffset;
- if (vec.length[offset + i] >= itemLength) {
- ptr = vec.vector[offset + i];
- ptrOffset = vec.start[offset + i];
- } else {
- // it is the wrong length, so copy it
- ptr = padding;
- ptrOffset = 0;
- System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
- ptr, 0, vec.length[offset + i]);
- Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
- }
- if (useDictionaryEncoding) {
- rows.add(dictionary.add(ptr, ptrOffset, itemLength));
- } else {
- directStreamOutput.write(ptr, ptrOffset, itemLength);
- lengthOutput.write(itemLength);
- }
- indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- // translate from UTF-8 to the default charset
- bloomFilter.addString(new String(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i],
- StandardCharsets.UTF_8));
- }
- bloomFilterUtf8.addBytes(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- }
- }
- }
- }
- }
- }
-
- /**
- * Under the covers, varchar is written to ORC the same way as string.
- */
- private static class VarcharTreeWriter extends StringBaseTreeWriter {
- private final int maxLength;
-
- VarcharTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- maxLength = schema.getMaxLength();
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- int itemLength = Math.min(vec.length[0], maxLength);
- if (useDictionaryEncoding) {
- int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
- for(int i=0; i < length; ++i) {
- rows.add(id);
- }
- } else {
- for(int i=0; i < length; ++i) {
- directStreamOutput.write(vec.vector[0], vec.start[0],
- itemLength);
- lengthOutput.write(itemLength);
- }
- }
- indexStatistics.updateString(vec.vector[0], vec.start[0],
- itemLength, length);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- // translate from UTF-8 to the default charset
- bloomFilter.addString(new String(vec.vector[0],
- vec.start[0], itemLength,
- StandardCharsets.UTF_8));
- }
- bloomFilterUtf8.addBytes(vec.vector[0],
- vec.start[0], itemLength);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- int itemLength = Math.min(vec.length[offset + i], maxLength);
- if (useDictionaryEncoding) {
- rows.add(dictionary.add(vec.vector[offset + i],
- vec.start[offset + i], itemLength));
- } else {
- directStreamOutput.write(vec.vector[offset + i],
- vec.start[offset + i], itemLength);
- lengthOutput.write(itemLength);
- }
- indexStatistics.updateString(vec.vector[offset + i],
- vec.start[offset + i], itemLength, 1);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- // translate from UTF-8 to the default charset
- bloomFilter.addString(new String(vec.vector[offset + i],
- vec.start[offset + i], itemLength,
- StandardCharsets.UTF_8));
- }
- bloomFilterUtf8.addBytes(vec.vector[offset + i],
- vec.start[offset + i], itemLength);
- }
- }
- }
- }
- }
- }
-
- private static class BinaryTreeWriter extends TreeWriterBase {
- private final PositionedOutputStream stream;
- private final IntegerWriter length;
- private boolean isDirectV2 = true;
-
- BinaryTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- this.stream = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.length = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- OrcProto.ColumnEncoding.Builder getEncoding() {
- OrcProto.ColumnEncoding.Builder result = super.getEncoding();
- if (isDirectV2) {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
- } else {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
- }
- return result;
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- for(int i=0; i < length; ++i) {
- stream.write(vec.vector[0], vec.start[0],
- vec.length[0]);
- this.length.write(vec.length[0]);
- }
- indexStatistics.updateBinary(vec.vector[0], vec.start[0],
- vec.length[0], length);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
- }
- bloomFilterUtf8.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- stream.write(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- this.length.write(vec.length[offset + i]);
- indexStatistics.updateBinary(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i], 1);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addBytes(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- }
- bloomFilterUtf8.addBytes(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- }
- }
- }
- }
- }
-
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- stream.flush();
- length.flush();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- stream.getPosition(recorder);
- length.getPosition(recorder);
- }
-
- @Override
- public long estimateMemory() {
- return super.estimateMemory() + stream.getBufferSize() +
- length.estimateMemory();
- }
-
- @Override
- public long getRawDataSize() {
- // get total length of binary blob
- BinaryColumnStatistics bcs = (BinaryColumnStatistics) fileStatistics;
- return bcs.getSum();
- }
- }
-
- public static final int MILLIS_PER_SECOND = 1000;
- public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
-
- private static class TimestampTreeWriter extends TreeWriterBase {
- private final IntegerWriter seconds;
- private final IntegerWriter nanos;
- private final boolean isDirectV2;
- private final TimeZone localTimezone;
- private final long baseEpochSecsLocalTz;
-
- TimestampTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.seconds = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
- this.nanos = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- this.localTimezone = TimeZone.getDefault();
- // for unit tests to set different time zones
- this.baseEpochSecsLocalTz = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
- writer.useWriterTimeZone(true);
- }
-
- @Override
- OrcProto.ColumnEncoding.Builder getEncoding() {
- OrcProto.ColumnEncoding.Builder result = super.getEncoding();
- if (isDirectV2) {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
- } else {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
- }
- return result;
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- TimestampColumnVector vec = (TimestampColumnVector) vector;
- Timestamp val;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- val = vec.asScratchTimestamp(0);
- long millis = val.getTime();
- long utc = SerializationUtils.convertToUtc(localTimezone, millis);
- indexStatistics.updateTimestamp(utc);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(millis);
- }
- bloomFilterUtf8.addLong(utc);
- }
- final long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz;
- final long nano = formatNanos(val.getNanos());
- for(int i=0; i < length; ++i) {
- seconds.write(secs);
- nanos.write(nano);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- val = vec.asScratchTimestamp(i + offset);
- long millis = val.getTime();
- long secs = millis / MILLIS_PER_SECOND - baseEpochSecsLocalTz;
- long utc = SerializationUtils.convertToUtc(localTimezone, millis);
- seconds.write(secs);
- nanos.write(formatNanos(val.getNanos()));
- indexStatistics.updateTimestamp(utc);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(millis);
- }
- bloomFilterUtf8.addLong(utc);
- }
- }
- }
- }
- }
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- seconds.flush();
- nanos.flush();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- private static long formatNanos(int nanos) {
- if (nanos == 0) {
- return 0;
- } else if (nanos % 100 != 0) {
- return ((long) nanos) << 3;
- } else {
- nanos /= 100;
- int trailingZeros = 1;
- while (nanos % 10 == 0 && trailingZeros < 7) {
- nanos /= 10;
- trailingZeros += 1;
- }
- return ((long) nanos) << 3 | trailingZeros;
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- seconds.getPosition(recorder);
- nanos.getPosition(recorder);
- }
-
- @Override
- public long estimateMemory() {
- return super.estimateMemory() + seconds.estimateMemory() +
- nanos.estimateMemory();
- }
-
- @Override
- public long getRawDataSize() {
- return fileStatistics.getNumberOfValues() *
- JavaDataModel.get().lengthOfTimestamp();
- }
- }
-
- private static class DateTreeWriter extends TreeWriterBase {
- private final IntegerWriter writer;
- private final boolean isDirectV2;
-
- DateTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- OutStream out = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.writer = createIntegerWriter(out, true, isDirectV2, writer);
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- int value = (int) vec.vector[0];
- indexStatistics.updateDate(value);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(value);
- }
- bloomFilterUtf8.addLong(value);
- }
- for(int i=0; i < length; ++i) {
- writer.write(value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- int value = (int) vec.vector[i + offset];
- writer.write(value);
- indexStatistics.updateDate(value);
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(value);
- }
- bloomFilterUtf8.addLong(value);
- }
- }
- }
- }
- }
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- writer.flush();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
- }
-
- @Override
- OrcProto.ColumnEncoding.Builder getEncoding() {
- OrcProto.ColumnEncoding.Builder result = super.getEncoding();
- if (isDirectV2) {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
- } else {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
- }
- return result;
- }
-
- @Override
- public long estimateMemory() {
- return super.estimateMemory() + writer.estimateMemory();
- }
-
- @Override
- public long getRawDataSize() {
- return fileStatistics.getNumberOfValues() *
- JavaDataModel.get().lengthOfDate();
- }
- }
-
- private static class DecimalTreeWriter extends TreeWriterBase {
- private final PositionedOutputStream valueStream;
-
- // These scratch buffers allow us to serialize decimals much faster.
- private final long[] scratchLongs;
- private final byte[] scratchBuffer;
-
- private final IntegerWriter scaleStream;
- private final boolean isDirectV2;
-
- DecimalTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- this.isDirectV2 = isNewWriteFormat(writer);
- valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN];
- scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
- this.scaleStream = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- OrcProto.ColumnEncoding.Builder getEncoding() {
- OrcProto.ColumnEncoding.Builder result = super.getEncoding();
- if (isDirectV2) {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
- } else {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
- }
- return result;
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- DecimalColumnVector vec = (DecimalColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- HiveDecimalWritable value = vec.vector[0];
- indexStatistics.updateDecimal(value);
- if (createBloomFilter) {
- String str = value.toString(scratchBuffer);
- if (bloomFilter != null) {
- bloomFilter.addString(str);
- }
- bloomFilterUtf8.addString(str);
- }
- for(int i=0; i < length; ++i) {
- value.serializationUtilsWrite(valueStream,
- scratchLongs);
- scaleStream.write(value.scale());
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- HiveDecimalWritable value = vec.vector[i + offset];
- value.serializationUtilsWrite(valueStream, scratchLongs);
- scaleStream.write(value.scale());
- indexStatistics.updateDecimal(value);
- if (createBloomFilter) {
- String str = value.toString(scratchBuffer);
- if (bloomFilter != null) {
- bloomFilter.addString(str);
- }
- bloomFilterUtf8.addString(str);
- }
- }
- }
- }
- }
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- valueStream.flush();
- scaleStream.flush();
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- valueStream.getPosition(recorder);
- scaleStream.getPosition(recorder);
- }
-
- @Override
- public long estimateMemory() {
- return super.estimateMemory() + valueStream.getBufferSize() +
- scaleStream.estimateMemory();
- }
-
- @Override
- public long getRawDataSize() {
- return fileStatistics.getNumberOfValues() *
- JavaDataModel.get().lengthOfDecimal();
- }
- }
-
- private static class StructTreeWriter extends TreeWriterBase {
- final TreeWriter[] childrenWriters;
-
- StructTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- List<TypeDescription> children = schema.getChildren();
- childrenWriters = new TreeWriterBase[children.size()];
- for(int i=0; i < childrenWriters.length; ++i) {
- childrenWriters[i] = createTreeWriter(
- children.get(i), writer,
- true);
- }
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- public void writeRootBatch(VectorizedRowBatch batch, int offset,
- int length) throws IOException {
- // update the statistics for the root column
- indexStatistics.increment(length);
- // I'm assuming that the root column isn't nullable so that I don't need
- // to update isPresent.
- for(int i=0; i < childrenWriters.length; ++i) {
- childrenWriters[i].writeBatch(batch.cols[i], offset, length);
- }
- }
-
- private static void writeFields(StructColumnVector vector,
- TreeWriter[] childrenWriters,
- int offset, int length) throws IOException {
- for(int field=0; field < childrenWriters.length; ++field) {
- childrenWriters[field].writeBatch(vector.fields[field], offset, length);
- }
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- StructColumnVector vec = (StructColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- writeFields(vec, childrenWriters, offset, length);
- }
- } else if (vector.noNulls) {
- writeFields(vec, childrenWriters, offset, length);
- } else {
- // write the records in runs
- int currentRun = 0;
- boolean started = false;
- for(int i=0; i < length; ++i) {
- if (!vec.isNull[i + offset]) {
- if (!started) {
- started = true;
- currentRun = i;
- }
- } else if (started) {
- started = false;
- writeFields(vec, childrenWriters, offset + currentRun,
- i - currentRun);
- }
- }
- if (started) {
- writeFields(vec, childrenWriters, offset + currentRun,
- length - currentRun);
- }
- }
- }
-
- @Override
- public void createRowIndexEntry() throws IOException {
- super.createRowIndexEntry();
- for(TreeWriter child: childrenWriters) {
- child.createRowIndexEntry();
- }
- }
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- for(TreeWriter child: childrenWriters) {
- child.writeStripe(builder, stats, requiredIndexEntries);
- }
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- public void updateFileStatistics(OrcProto.StripeStatistics stats) {
- super.updateFileStatistics(stats);
- for(TreeWriter child: childrenWriters) {
- child.updateFileStatistics(stats);
- }
- }
-
- @Override
- public long estimateMemory() {
- long result = 0;
- for(TreeWriter writer: childrenWriters) {
- result += writer.estimateMemory();
- }
- return super.estimateMemory() + result;
- }
-
- @Override
- public long getRawDataSize() {
- long result = 0;
- for(TreeWriter writer: childrenWriters) {
- result += writer.getRawDataSize();
- }
- return result;
- }
-
- @Override
- public void writeFileStatistics(OrcProto.Footer.Builder footer) {
- super.writeFileStatistics(footer);
- for(TreeWriter child: childrenWriters) {
- child.writeFileStatistics(footer);
- }
- }
- }
-
- private static class ListTreeWriter extends TreeWriterBase {
- private final IntegerWriter lengths;
- private final boolean isDirectV2;
- private final TreeWriter childWriter;
-
- ListTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- this.isDirectV2 = isNewWriteFormat(writer);
- childWriter =
- createTreeWriter(schema.getChildren().get(0), writer, true);
- lengths = createIntegerWriter(writer.createStream(columnId,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- OrcProto.ColumnEncoding.Builder getEncoding() {
- OrcProto.ColumnEncoding.Builder result = super.getEncoding();
- if (isDirectV2) {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
- } else {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
- }
- return result;
- }
-
- @Override
- public void createRowIndexEntry() throws IOException {
- super.createRowIndexEntry();
- childWriter.createRowIndexEntry();
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- ListColumnVector vec = (ListColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- int childOffset = (int) vec.offsets[0];
- int childLength = (int) vec.lengths[0];
- for(int i=0; i < length; ++i) {
- lengths.write(childLength);
- childWriter.writeBatch(vec.child, childOffset, childLength);
- }
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(childLength);
- }
- bloomFilterUtf8.addLong(childLength);
- }
- }
- } else {
- // write the elements in runs
- int currentOffset = 0;
- int currentLength = 0;
- for(int i=0; i < length; ++i) {
- if (!vec.isNull[i + offset]) {
- int nextLength = (int) vec.lengths[offset + i];
- int nextOffset = (int) vec.offsets[offset + i];
- lengths.write(nextLength);
- if (currentLength == 0) {
- currentOffset = nextOffset;
- currentLength = nextLength;
- } else if (currentOffset + currentLength != nextOffset) {
- childWriter.writeBatch(vec.child, currentOffset,
- currentLength);
- currentOffset = nextOffset;
- currentLength = nextLength;
- } else {
- currentLength += nextLength;
- }
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(nextLength);
- }
- bloomFilterUtf8.addLong(nextLength);
- }
+ CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
+ // TODO: modify may create a new codec here. We want to end() it when the stream is closed,
+ // but at this point there's no close() for the stream.
+ CompressionCodec result = physicalWriter.getCompressionCodec();
+ if (result != null) {
+ switch (kind) {
+ case BLOOM_FILTER:
+ case DATA:
+ case DICTIONARY_DATA:
+ case BLOOM_FILTER_UTF8:
+ if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
+ result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
+ CompressionCodec.Modifier.TEXT));
+ } else {
+ result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+ CompressionCodec.Modifier.TEXT));
}
- }
- if (currentLength != 0) {
- childWriter.writeBatch(vec.child, currentOffset,
- currentLength);
- }
- }
- }
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- lengths.flush();
- childWriter.writeStripe(builder, stats, requiredIndexEntries);
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
+ break;
+ case LENGTH:
+ case DICTIONARY_COUNT:
+ case PRESENT:
+ case ROW_INDEX:
+ case SECONDARY:
+ // easily compressed using the fastest modes
+ result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
+ CompressionCodec.Modifier.BINARY));
+ break;
+ default:
+ LOG.info("Missing ORC compression modifiers for " + kind);
+ break;
}
}
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- lengths.getPosition(recorder);
- }
-
- @Override
- public void updateFileStatistics(OrcProto.StripeStatistics stats) {
- super.updateFileStatistics(stats);
- childWriter.updateFileStatistics(stats);
- }
-
- @Override
- public long estimateMemory() {
- return super.estimateMemory() + lengths.estimateMemory() +
- childWriter.estimateMemory();
- }
-
- @Override
- public long getRawDataSize() {
- return childWriter.getRawDataSize();
- }
-
- @Override
- public void writeFileStatistics(OrcProto.Footer.Builder footer) {
- super.writeFileStatistics(footer);
- childWriter.writeFileStatistics(footer);
- }
+ return result;
}
- private static class MapTreeWriter extends TreeWriterBase {
- private final IntegerWriter lengths;
- private final boolean isDirectV2;
- private final TreeWriter keyWriter;
- private final TreeWriter valueWriter;
-
- MapTreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, schema, writer, nullable);
- this.isDirectV2 = isNewWriteFormat(writer);
- List<TypeDescription> children = schema.getChildren();
- keyWriter = createTreeWriter(children.get(0), writer, true);
- valueWriter = createTreeWriter(children.get(1), writer, true);
- lengths = createIntegerWriter(writer.createStream(columnId,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- OrcProto.ColumnEncoding.Builder getEncoding() {
- OrcProto.ColumnEncoding.Builder result = super.getEncoding();
- if (isDirectV2) {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2);
- } else {
- result.setKind(OrcProto.ColumnEncoding.Kind.DIRECT);
- }
- return result;
- }
-
- @Override
- public void createRowIndexEntry() throws IOException {
- super.createRowIndexEntry();
- keyWriter.createRowIndexEntry();
- valueWriter.createRowIndexEntry();
- }
-
- @Override
- public void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- MapColumnVector vec = (MapColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- int childOffset = (int) vec.offsets[0];
- int childLength = (int) vec.lengths[0];
- for(int i=0; i < length; ++i) {
- lengths.write(childLength);
- keyWriter.writeBatch(vec.keys, childOffset, childLength);
- valueWriter.writeBatch(vec.values, childOffset, childLength);
- }
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(childLength);
- }
- bloomFilterUtf8.addLong(childLength);
- }
- }
- } else {
- // write the elements in runs
- int currentOffset = 0;
- int currentLength = 0;
- for(int i=0; i < length; ++i) {
- if (!vec.isNull[i + offset]) {
- int nextLength = (int) vec.lengths[offset + i];
- int nextOffset = (int) vec.offsets[offset + i];
- lengths.write(nextLength);
- if (currentLength == 0) {
- currentOffset = nextOffset;
- currentLength = nextLength;
- } else if (currentOffset + currentLength != nextOffset) {
- keyWriter.writeBatch(vec.keys, currentOffset,
- currentLength);
- valueWriter.writeBatch(vec.values, currentOffset,
- currentLength);
- currentOffset = nextOffset;
- currentLength = nextLength;
- } else {
- currentLength += nextLength;
- }
- if (createBloomFilter) {
- if (bloomFilter != null) {
- bloomFilter.addLong(nextLength);
- }
- bloomFilterUtf8.addLong(nextLength);
- }
- }
- }
- if (currentLength != 0) {
- keyWriter.writeBatch(vec.keys, currentOffset,
- currentLength);
- valueWriter.writeBatch(vec.values, currentOffset,
- currentLength);
- }
- }
- }
-
- @Override
- public void writeStripe(OrcProto.StripeFooter.Builder builder,
- OrcProto.StripeStatistics.Builder stats,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, stats, requiredIndexEntries);
- lengths.flush();
- keyWriter.writeStripe(builder, stats, requiredIndexEntries);
- valueWriter.writeStripe(builder, stats, requiredIndexEntries);
- if (rowIndexPosition != null) {
- recordPosition(rowIndexPosition);
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- lengths.getPosition(recorder);
- }
-
- @Override
- public void updateFileStatistics(OrcProto.StripeStatistics stats) {
- super.updateFileStatistics(stats);
- keyWriter.updateFileStatistics(stats);
- valueWriter.updateFileStatistics(stats);
- }
+ /**
+ * Interface from the Writer to the TreeWriters. This limits the visibility
+ * that the TreeWriters have into the Writer.
+ */
+ private class StreamFactory implements WriterContext {
+ /**
+ * Create a stream to store part of a column.
+ * @param column the column id for the stream
+ * @param kind the kind of stream
+ * @return The output outStream that the section needs to be written to.
+ */
+ public OutStream createStream(int column,
+ OrcProto.Stream.Kind kind
+ ) throws IOException {
+ final StreamName name = new StreamName(column, kind);
+ CompressionCodec codec = getCustomizedCodec(kind);
- @Ov
<TRUNCATED>