You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/05/18 15:20:42 UTC
orc git commit: ORC-193. Refactor TreeWriter API in WriterImpl.
Repository: orc
Updated Branches:
refs/heads/master 3ec52c0da -> cea20ac5d
ORC-193. Refactor TreeWriter API in WriterImpl.
Fixes #124
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/cea20ac5
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/cea20ac5
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/cea20ac5
Branch: refs/heads/master
Commit: cea20ac5db4bb6ec42df20af55e1edb4e23e4b1b
Parents: 3ec52c0
Author: Owen O'Malley <om...@apache.org>
Authored: Wed May 17 13:52:12 2017 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Thu May 18 08:19:49 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/orc/impl/WriterImpl.java | 859 ++++++++++---------
.../test/org/apache/orc/tools/TestFileDump.java | 4 +-
2 files changed, 473 insertions(+), 390 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/cea20ac5/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 32820e1..cfdddad 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -100,12 +100,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private static final int MIN_ROW_INDEX_STRIDE = 1000;
private final Path path;
- private final long defaultStripeSize;
private long adjustedStripeSize;
private final int rowIndexStride;
private final CompressionKind compress;
private int bufferSize;
- private final long blockSize;
private final TypeDescription schema;
private final PhysicalWriter physicalWriter;
private final OrcFile.WriterVersion writerVersion;
@@ -118,10 +116,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private long lastFlushOffset = 0;
private int stripesAtLastFlush = -1;
private final List<OrcProto.StripeInformation> stripes =
- new ArrayList<OrcProto.StripeInformation>();
+ new ArrayList<>();
+ private final OrcProto.Metadata.Builder fileMetadata =
+ OrcProto.Metadata.newBuilder();
private final Map<String, ByteString> userMetadata =
- new TreeMap<String, ByteString>();
- private final StreamFactory streamFactory = new StreamFactory();
+ new TreeMap<>();
private final TreeWriter treeWriter;
private final boolean buildIndex;
private final MemoryManager memoryManager;
@@ -157,11 +156,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
callbackContext = null;
}
this.adjustedStripeSize = opts.getStripeSize();
- this.defaultStripeSize = opts.getStripeSize();
this.version = opts.getVersion();
this.encodingStrategy = opts.getEncodingStrategy();
this.compressionStrategy = opts.getCompressionStrategy();
- this.blockSize = opts.getBlockSize();
this.compress = opts.getCompress();
this.rowIndexStride = opts.getRowIndexStride();
this.memoryManager = opts.getMemoryManager();
@@ -170,7 +167,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
if (opts.isEnforceBufferSize()) {
this.bufferSize = opts.getBufferSize();
} else {
- this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
+ this.bufferSize = getEstimatedBufferSize(adjustedStripeSize,
numColumns, opts.getBufferSize());
}
if (version == OrcFile.Version.V_0_11) {
@@ -184,7 +181,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
this.physicalWriter = opts.getPhysicalWriter() == null ?
new PhysicalFsWriter(fs, path, opts) : opts.getPhysicalWriter();
physicalWriter.writeHeader();
- treeWriter = createTreeWriter(schema, streamFactory, false);
+ treeWriter = createTreeWriter(schema, new StreamFactory(), false);
if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
throw new IllegalArgumentException("Row stride must be at least " +
MIN_ROW_INDEX_STRIDE);
@@ -193,7 +190,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
// ensure that we are able to handle callbacks before we register ourselves
memoryManager.addWriter(path, opts.getStripeSize(), this);
LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
- " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize,
+ " compression: {} bufferSize: {}", path, adjustedStripeSize, opts.getBlockSize(),
compress, bufferSize);
}
@@ -268,7 +265,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
@Override
public boolean checkMemory(double newScale) throws IOException {
- long limit = (long) Math.round(adjustedStripeSize * newScale);
+ long limit = Math.round(adjustedStripeSize * newScale);
long size = treeWriter.estimateMemory();
if (LOG.isDebugEnabled()) {
LOG.debug("ORC writer " + physicalWriter + " size = " + size +
@@ -339,7 +336,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
* @param column the column id for the stream
* @param kind the kind of stream
* @return The output outStream that the section needs to be written to.
- * @throws IOException
*/
public OutStream createStream(int column,
OrcProto.Stream.Kind kind
@@ -391,14 +387,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
/**
- * Get the compression strategy to use.
- * @return compression strategy
- */
- public OrcFile.CompressionStrategy getCompressionStrategy() {
- return compressionStrategy;
- }
-
- /**
* Get the bloom filter columns
* @return bloom filter columns
*/
@@ -455,19 +443,89 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
/**
+ * The writers for the specific writers of each type. This provides
+ * the generic API that they must all implement.
+ */
+ interface TreeWriter {
+
+ /**
+ * Estimate the memory currently used to buffer the stripe.
+ * @return the number of bytes
+ */
+ long estimateMemory();
+
+ /**
+ * Estimate the memory used if the file was read into Hive's Writable
+ * types. This is used as an estimate for the query optimizer.
+ * @return the number of bytes
+ */
+ long getRawDataSize();
+
+ /**
+ * Write a VectorizedRowBath to the file. This is called by the WriterImpl
+ * at the top level.
+ * @param batch the list of all of the columns
+ * @param offset the first row from the batch to write
+ * @param length the number of rows to write
+ */
+ void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException;
+
+ /**
+ * Write a ColumnVector to the file. This is called recursively by
+ * writeRootBatch.
+ * @param vector the data to write
+ * @param offset the first value offset to write.
+ * @param length the number of values to write
+ */
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException;
+
+ /**
+ * Create a row index entry at the current point in the stripe.
+ */
+ void createRowIndexEntry() throws IOException;
+
+ /**
+ * Write the stripe out to the file.
+ * @param stripeFooter the stripe footer that contains the information about the
+ * layout of the stripe. The TreeWriterBase is required to update
+ * the footer with its information.
+ * @param stats the stripe statistics information
+ * @param requiredIndexEntries the number of index entries that are
+ * required. this is to check to make sure the
+ * row index is well formed.
+ */
+ void writeStripe(OrcProto.StripeFooter.Builder stripeFooter,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException;
+
+ /**
+ * During a stripe append, we need to update the file statistics.
+ * @param stripeStatistics the statistics for the new stripe
+ */
+ void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics);
+
+ /**
+ * Add the file statistics to the file footer.
+ * @param footer the file footer builder
+ */
+ void writeFileStatistics(OrcProto.Footer.Builder footer);
+ }
+
+ /**
* The parent class of all of the writers for each column. Each column
* is written by an instance of this class. The compound types (struct,
* list, map, and union) have children tree writers that write the children
* types.
*/
- private abstract static class TreeWriter {
+ private abstract static class TreeWriterBase implements TreeWriter {
protected final int id;
protected final BitFieldWriter isPresent;
private final boolean isCompressed;
protected final ColumnStatisticsImpl indexStatistics;
protected final ColumnStatisticsImpl stripeColStatistics;
- private final ColumnStatisticsImpl fileStatistics;
- protected TreeWriter[] childrenWriters;
+ protected final ColumnStatisticsImpl fileStatistics;
protected final RowIndexPositionRecorder rowIndexPosition;
private final OrcProto.RowIndex.Builder rowIndex;
private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
@@ -479,7 +537,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
protected final OrcProto.BloomFilter.Builder bloomFilterEntry;
private boolean foundNulls;
private OutStream isPresentOutStream;
- private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders;
private final StreamFactory streamFactory;
/**
@@ -488,12 +545,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
* @param schema the row schema
* @param streamFactory limited access to the Writer's data.
* @param nullable can the value be null?
- * @throws IOException
*/
- TreeWriter(int columnId,
- TypeDescription schema,
- StreamFactory streamFactory,
- boolean nullable) throws IOException {
+ TreeWriterBase(int columnId,
+ TypeDescription schema,
+ StreamFactory streamFactory,
+ boolean nullable) throws IOException {
this.streamFactory = streamFactory;
this.isCompressed = streamFactory.isCompressed();
this.id = columnId;
@@ -509,7 +565,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
indexStatistics = ColumnStatisticsImpl.create(schema);
stripeColStatistics = ColumnStatisticsImpl.create(schema);
fileStatistics = ColumnStatisticsImpl.create(schema);
- childrenWriters = new TreeWriter[0];
if (streamFactory.buildIndex()) {
rowIndex = OrcProto.RowIndex.newBuilder();
rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
@@ -519,7 +574,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
rowIndexEntry = null;
rowIndexPosition = null;
}
- stripeStatsBuilders = new ArrayList<>();
if (createBloomFilter) {
bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
if (streamFactory.getBloomFilterVersion() == OrcFile.BloomFilterVersion.ORIGINAL) {
@@ -581,10 +635,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
* @param batch the batch to write from
* @param offset the row to start on
* @param length the number of rows to write
- * @throws IOException
*/
- void writeRootBatch(VectorizedRowBatch batch, int offset,
- int length) throws IOException {
+ public void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException {
writeBatch(batch.cols[0], offset, length);
}
@@ -593,10 +646,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
* @param vector the vector to write from
* @param offset the first value from the vector to write
* @param length the number of values from the vector to write
- * @throws IOException
*/
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
if (vector.noNulls) {
indexStatistics.increment(length);
if (isPresent != null) {
@@ -650,18 +703,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
- /**
- * Write the stripe out to the file.
- * @param builder the stripe footer that contains the information about the
- * layout of the stripe. The TreeWriter is required to update
- * the footer with its information.
- * @param requiredIndexEntries the number of index entries that are
- * required. this is to check to make sure the
- * row index is well formed.
- * @throws IOException
- */
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
if (isPresent != null) {
isPresent.flush();
@@ -678,9 +722,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
// merge stripe-level column statistics to file statistics and write it to
// stripe statistics
- OrcProto.StripeStatistics.Builder stripeStatsBuilder = OrcProto.StripeStatistics.newBuilder();
- writeStripeStatistics(stripeStatsBuilder, this);
- stripeStatsBuilders.add(stripeStatsBuilder);
+ fileStatistics.merge(stripeColStatistics);
+ stats.addColStats(stripeColStatistics.serialize());
+ stripeColStatistics.reset();
// reset the flag for next stripe
foundNulls = false;
@@ -714,20 +758,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
- private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
- TreeWriter treeWriter) {
- treeWriter.fileStatistics.merge(treeWriter.stripeColStatistics);
- builder.addColStats(treeWriter.stripeColStatistics.serialize().build());
- treeWriter.stripeColStatistics.reset();
- for (TreeWriter child : treeWriter.getChildrenWriters()) {
- writeStripeStatistics(builder, child);
- }
- }
-
- TreeWriter[] getChildrenWriters() {
- return childrenWriters;
- }
-
/**
* Get the encoding for this column.
* @return the information about the encoding of this column
@@ -747,9 +777,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
* index statistics. Also merges the index statistics into the file
* statistics before they are cleared. Finally, it records the start of the
* next index and ensures all of the children columns also create an entry.
- * @throws IOException
*/
- void createRowIndexEntry() throws IOException {
+ public void createRowIndexEntry() throws IOException {
stripeColStatistics.merge(indexStatistics);
rowIndexEntry.setStatistics(indexStatistics.serialize());
indexStatistics.reset();
@@ -757,9 +786,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
rowIndexEntry.clear();
addBloomFilterEntry();
recordPosition(rowIndexPosition);
- for(TreeWriter child: childrenWriters) {
- child.createRowIndexEntry();
- }
}
void addBloomFilterEntry() {
@@ -777,10 +803,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
+ @Override
+ public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+ fileStatistics.merge(ColumnStatisticsImpl.deserialize(stats.getColStats(id)));
+ }
+
/**
* Record the current position in each of this column's streams.
* @param recorder where should the locations be recorded
- * @throws IOException
*/
void recordPosition(PositionRecorder recorder) throws IOException {
if (isPresent != null) {
@@ -792,19 +822,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
* Estimate how much memory the writer is consuming excluding the streams.
* @return the number of bytes.
*/
- long estimateMemory() {
+ public long estimateMemory() {
long result = 0;
if (isPresent != null) {
result = isPresentOutStream.getBufferSize();
}
- for (TreeWriter child: childrenWriters) {
- result += child.estimateMemory();
- }
return result;
}
+
+ @Override
+ public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+ footer.addStatistics(fileStatistics.serialize());
+ }
}
- private static class BooleanTreeWriter extends TreeWriter {
+ private static class BooleanTreeWriter extends TreeWriterBase {
private final BitFieldWriter writer;
BooleanTreeWriter(int columnId,
@@ -821,8 +853,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
LongColumnVector vec = (LongColumnVector) vector;
if (vector.isRepeating) {
@@ -845,9 +877,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
writer.flush();
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
@@ -861,12 +894,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
+ public long estimateMemory() {
return super.estimateMemory() + writer.estimateMemory();
}
+
+ @Override
+ public long getRawDataSize() {
+ long num = fileStatistics.getNumberOfValues();
+ return num * JavaDataModel.get().primitive1();
+ }
}
- private static class ByteTreeWriter extends TreeWriter {
+ private static class ByteTreeWriter extends TreeWriterBase {
private final RunLengthByteWriter writer;
ByteTreeWriter(int columnId,
@@ -882,8 +921,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
LongColumnVector vec = (LongColumnVector) vector;
if (vector.isRepeating) {
@@ -918,9 +957,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
writer.flush();
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
@@ -934,14 +974,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
+ public long estimateMemory() {
return super.estimateMemory() + writer.estimateMemory();
}
+
+ @Override
+ public long getRawDataSize() {
+ long num = fileStatistics.getNumberOfValues();
+ return num * JavaDataModel.get().primitive1();
+ }
}
- private static class IntegerTreeWriter extends TreeWriter {
+ private static class IntegerTreeWriter extends TreeWriterBase {
private final IntegerWriter writer;
private boolean isDirectV2 = true;
+ private final boolean isLong;
IntegerTreeWriter(int columnId,
TypeDescription schema,
@@ -955,6 +1002,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
}
+ this.isLong = schema.getCategory() == TypeDescription.Category.LONG;
}
@Override
@@ -969,8 +1017,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
LongColumnVector vec = (LongColumnVector) vector;
if (vector.isRepeating) {
@@ -1005,9 +1053,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
writer.flush();
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
@@ -1021,12 +1070,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
+ public long estimateMemory() {
return super.estimateMemory() + writer.estimateMemory();
}
+
+ @Override
+ public long getRawDataSize() {
+ JavaDataModel jdm = JavaDataModel.get();
+ long num = fileStatistics.getNumberOfValues();
+ return num * (isLong ? jdm.primitive2() : jdm.primitive1());
+ }
}
- private static class FloatTreeWriter extends TreeWriter {
+ private static class FloatTreeWriter extends TreeWriterBase {
private final PositionedOutputStream stream;
private final SerializationUtils utils;
@@ -1044,8 +1100,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
DoubleColumnVector vec = (DoubleColumnVector) vector;
if (vector.isRepeating) {
@@ -1081,9 +1137,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
stream.flush();
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
@@ -1097,12 +1154,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
+ public long estimateMemory() {
return super.estimateMemory() + stream.getBufferSize();
}
+
+ @Override
+ public long getRawDataSize() {
+ long num = fileStatistics.getNumberOfValues();
+ return num * JavaDataModel.get().primitive1();
+ }
}
- private static class DoubleTreeWriter extends TreeWriter {
+ private static class DoubleTreeWriter extends TreeWriterBase {
private final PositionedOutputStream stream;
private final SerializationUtils utils;
@@ -1120,8 +1183,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
DoubleColumnVector vec = (DoubleColumnVector) vector;
if (vector.isRepeating) {
@@ -1156,9 +1219,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
stream.flush();
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
@@ -1172,12 +1236,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
+ public long estimateMemory() {
return super.estimateMemory() + stream.getBufferSize();
}
+
+ @Override
+ public long getRawDataSize() {
+ long num = fileStatistics.getNumberOfValues();
+ return num * JavaDataModel.get().primitive2();
+ }
}
- private static abstract class StringBaseTreeWriter extends TreeWriter {
+ private static abstract class StringBaseTreeWriter extends TreeWriterBase {
private static final int INITIAL_DICTIONARY_SIZE = 4096;
private final OutStream stringOutput;
protected final IntegerWriter lengthOutput;
@@ -1187,9 +1257,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
protected final DynamicIntArray rows = new DynamicIntArray();
protected final PositionedOutputStream directStreamOutput;
private final List<OrcProto.RowIndexEntry> savedRowIndex =
- new ArrayList<OrcProto.RowIndexEntry>();
+ new ArrayList<>();
private final boolean buildIndex;
- private final List<Long> rowIndexValueCount = new ArrayList<Long>();
+ private final List<Long> rowIndexValueCount = new ArrayList<>();
// If the number of keys in a dictionary is greater than this fraction of
//the total number of non-null rows, turn off dictionary encoding
private final double dictionaryKeySizeThreshold;
@@ -1224,7 +1294,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
doneDictionaryCheck = false;
}
- private boolean checkDictionaryEncoding() {
+ private void checkDictionaryEncoding() {
if (!doneDictionaryCheck) {
// Set the flag indicating whether or not to use dictionary encoding
// based on whether or not the fraction of distinct keys over number of
@@ -1233,12 +1303,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
doneDictionaryCheck = true;
}
- return useDictionaryEncoding;
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
// if rows in stripe is less than dictionaryCheckAfterRows, dictionary
// checking would not have happened. So do it again here.
checkDictionaryEncoding();
@@ -1257,7 +1327,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
// we need to build the rowindex before calling super, since it
// writes it out.
- super.writeStripe(builder, requiredIndexEntries);
+ super.writeStripe(builder, stats, requiredIndexEntries);
if (useDictionaryEncoding) {
stringOutput.flush();
lengthOutput.flush();
@@ -1365,10 +1435,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
* other TreeWriters, this one can't record the position in the streams
* until the stripe is being flushed. Therefore it saves all of the entries
* and augments them with the final information as the stripe is written.
- * @throws IOException
*/
@Override
- void createRowIndexEntry() throws IOException {
+ public void createRowIndexEntry() throws IOException {
getStripeStatistics().merge(indexStatistics);
OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
rowIndexEntry.setStatistics(indexStatistics.serialize());
@@ -1378,7 +1447,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
rowIndexEntry.clear();
addBloomFilterEntry();
recordPosition(rowIndexPosition);
- rowIndexValueCount.add(Long.valueOf(rows.size()));
+ rowIndexValueCount.add((long) rows.size());
if (strideDictionaryCheck) {
checkDictionaryEncoding();
}
@@ -1403,7 +1472,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
+ public long estimateMemory() {
long parent = super.estimateMemory();
if (useDictionaryEncoding) {
return parent + dictionary.getSizeInBytes() + rows.getSizeInBytes();
@@ -1412,6 +1481,20 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
directStreamOutput.getBufferSize();
}
}
+
+ @Override
+ public long getRawDataSize() {
+ // ORC strings are converted to java Strings. so use JavaDataModel to
+ // compute the overall size of strings
+ StringColumnStatistics scs = (StringColumnStatistics) fileStatistics;
+ long numVals = fileStatistics.getNumberOfValues();
+ if (numVals == 0) {
+ return 0;
+ } else {
+ int avgSize = (int) (scs.getSum() / numVals);
+ return numVals * JavaDataModel.get().lengthForStringOfLength(avgSize);
+ }
+ }
}
private static class StringTreeWriter extends StringBaseTreeWriter {
@@ -1423,8 +1506,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
BytesColumnVector vec = (BytesColumnVector) vector;
if (vector.isRepeating) {
@@ -1498,8 +1581,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
BytesColumnVector vec = (BytesColumnVector) vector;
if (vector.isRepeating) {
@@ -1591,8 +1674,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
BytesColumnVector vec = (BytesColumnVector) vector;
if (vector.isRepeating) {
@@ -1653,7 +1736,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
- private static class BinaryTreeWriter extends TreeWriter {
+ private static class BinaryTreeWriter extends TreeWriterBase {
private final PositionedOutputStream stream;
private final IntegerWriter length;
private boolean isDirectV2 = true;
@@ -1685,8 +1768,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
BytesColumnVector vec = (BytesColumnVector) vector;
if (vector.isRepeating) {
@@ -1728,9 +1811,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
stream.flush();
length.flush();
if (rowIndexPosition != null) {
@@ -1746,16 +1830,23 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
+ public long estimateMemory() {
return super.estimateMemory() + stream.getBufferSize() +
length.estimateMemory();
}
+
+ @Override
+ public long getRawDataSize() {
+ // get total length of binary blob
+ BinaryColumnStatistics bcs = (BinaryColumnStatistics) fileStatistics;
+ return bcs.getSum();
+ }
}
public static final int MILLIS_PER_SECOND = 1000;
public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
- private static class TimestampTreeWriter extends TreeWriter {
+ private static class TimestampTreeWriter extends TreeWriterBase {
private final IntegerWriter seconds;
private final IntegerWriter nanos;
private final boolean isDirectV2;
@@ -1793,8 +1884,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
TimestampColumnVector vec = (TimestampColumnVector) vector;
Timestamp val;
@@ -1839,9 +1930,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
seconds.flush();
nanos.flush();
if (rowIndexPosition != null) {
@@ -1873,13 +1965,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
+ public long estimateMemory() {
return super.estimateMemory() + seconds.estimateMemory() +
nanos.estimateMemory();
}
+
+ @Override
+ public long getRawDataSize() {
+ return fileStatistics.getNumberOfValues() *
+ JavaDataModel.get().lengthOfTimestamp();
+ }
}
- private static class DateTreeWriter extends TreeWriter {
+ private static class DateTreeWriter extends TreeWriterBase {
private final IntegerWriter writer;
private final boolean isDirectV2;
@@ -1898,8 +1996,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
LongColumnVector vec = (LongColumnVector) vector;
if (vector.isRepeating) {
@@ -1934,9 +2032,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
writer.flush();
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
@@ -1961,12 +2060,18 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
+ public long estimateMemory() {
return super.estimateMemory() + writer.estimateMemory();
}
+
+ @Override
+ public long getRawDataSize() {
+ return fileStatistics.getNumberOfValues() *
+ JavaDataModel.get().lengthOfDate();
+ }
}
- private static class DecimalTreeWriter extends TreeWriter {
+ private static class DecimalTreeWriter extends TreeWriterBase {
private final PositionedOutputStream valueStream;
// These scratch buffers allow us to serialize decimals much faster.
@@ -2004,8 +2109,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
DecimalColumnVector vec = (DecimalColumnVector) vector;
if (vector.isRepeating) {
@@ -2045,9 +2150,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
valueStream.flush();
scaleStream.flush();
if (rowIndexPosition != null) {
@@ -2063,20 +2169,28 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
+ public long estimateMemory() {
return super.estimateMemory() + valueStream.getBufferSize() +
scaleStream.estimateMemory();
}
+
+ @Override
+ public long getRawDataSize() {
+ return fileStatistics.getNumberOfValues() *
+ JavaDataModel.get().lengthOfDecimal();
+ }
}
- private static class StructTreeWriter extends TreeWriter {
+ private static class StructTreeWriter extends TreeWriterBase {
+ final TreeWriter[] childrenWriters;
+
StructTreeWriter(int columnId,
TypeDescription schema,
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, schema, writer, nullable);
List<TypeDescription> children = schema.getChildren();
- childrenWriters = new TreeWriter[children.size()];
+ childrenWriters = new TreeWriterBase[children.size()];
for(int i=0; i < childrenWriters.length; ++i) {
childrenWriters[i] = createTreeWriter(
children.get(i), writer,
@@ -2088,8 +2202,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeRootBatch(VectorizedRowBatch batch, int offset,
- int length) throws IOException {
+ public void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException {
// update the statistics for the root column
indexStatistics.increment(length);
// I'm assuming that the root column isn't nullable so that I don't need
@@ -2108,8 +2222,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
StructColumnVector vec = (StructColumnVector) vector;
if (vector.isRepeating) {
@@ -2142,21 +2256,65 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void createRowIndexEntry() throws IOException {
+ super.createRowIndexEntry();
+ for(TreeWriter child: childrenWriters) {
+ child.createRowIndexEntry();
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
for(TreeWriter child: childrenWriters) {
- child.writeStripe(builder, requiredIndexEntries);
+ child.writeStripe(builder, stats, requiredIndexEntries);
}
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
}
}
+
+ @Override
+ public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+ super.updateFileStatistics(stats);
+ for(TreeWriter child: childrenWriters) {
+ child.updateFileStatistics(stats);
+ }
+ }
+
+ @Override
+ public long estimateMemory() {
+ long result = 0;
+ for(TreeWriter writer: childrenWriters) {
+ result += writer.estimateMemory();
+ }
+ return super.estimateMemory() + result;
+ }
+
+ @Override
+ public long getRawDataSize() {
+ long result = 0;
+ for(TreeWriter writer: childrenWriters) {
+ result += writer.getRawDataSize();
+ }
+ return result;
+ }
+
+ @Override
+ public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+ super.writeFileStatistics(footer);
+ for(TreeWriter child: childrenWriters) {
+ child.writeFileStatistics(footer);
+ }
+ }
}
- private static class ListTreeWriter extends TreeWriter {
+ private static class ListTreeWriter extends TreeWriterBase {
private final IntegerWriter lengths;
private final boolean isDirectV2;
+ private final TreeWriter childWriter;
ListTreeWriter(int columnId,
TypeDescription schema,
@@ -2164,8 +2322,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
boolean nullable) throws IOException {
super(columnId, schema, writer, nullable);
this.isDirectV2 = isNewWriteFormat(writer);
- childrenWriters = new TreeWriter[1];
- childrenWriters[0] =
+ childWriter =
createTreeWriter(schema.getChildren().get(0), writer, true);
lengths = createIntegerWriter(writer.createStream(columnId,
OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
@@ -2186,8 +2343,14 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void createRowIndexEntry() throws IOException {
+ super.createRowIndexEntry();
+ childWriter.createRowIndexEntry();
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
ListColumnVector vec = (ListColumnVector) vector;
if (vector.isRepeating) {
@@ -2196,7 +2359,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
int childLength = (int) vec.lengths[0];
for(int i=0; i < length; ++i) {
lengths.write(childLength);
- childrenWriters[0].writeBatch(vec.child, childOffset, childLength);
+ childWriter.writeBatch(vec.child, childOffset, childLength);
}
if (createBloomFilter) {
if (bloomFilter != null) {
@@ -2218,7 +2381,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
currentOffset = nextOffset;
currentLength = nextLength;
} else if (currentOffset + currentLength != nextOffset) {
- childrenWriters[0].writeBatch(vec.child, currentOffset,
+ childWriter.writeBatch(vec.child, currentOffset,
currentLength);
currentOffset = nextOffset;
currentLength = nextLength;
@@ -2234,20 +2397,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
if (currentLength != 0) {
- childrenWriters[0].writeBatch(vec.child, currentOffset,
+ childWriter.writeBatch(vec.child, currentOffset,
currentLength);
}
}
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
lengths.flush();
- for(TreeWriter child: childrenWriters) {
- child.writeStripe(builder, requiredIndexEntries);
- }
+ childWriter.writeStripe(builder, stats, requiredIndexEntries);
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
}
@@ -2260,14 +2422,34 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
- return super.estimateMemory() + lengths.estimateMemory();
+ public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+ super.updateFileStatistics(stats);
+ childWriter.updateFileStatistics(stats);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + lengths.estimateMemory() +
+ childWriter.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ return childWriter.getRawDataSize();
+ }
+
+ @Override
+ public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+ super.writeFileStatistics(footer);
+ childWriter.writeFileStatistics(footer);
}
}
- private static class MapTreeWriter extends TreeWriter {
+ private static class MapTreeWriter extends TreeWriterBase {
private final IntegerWriter lengths;
private final boolean isDirectV2;
+ private final TreeWriter keyWriter;
+ private final TreeWriter valueWriter;
MapTreeWriter(int columnId,
TypeDescription schema,
@@ -2275,12 +2457,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
boolean nullable) throws IOException {
super(columnId, schema, writer, nullable);
this.isDirectV2 = isNewWriteFormat(writer);
- childrenWriters = new TreeWriter[2];
List<TypeDescription> children = schema.getChildren();
- childrenWriters[0] =
- createTreeWriter(children.get(0), writer, true);
- childrenWriters[1] =
- createTreeWriter(children.get(1), writer, true);
+ keyWriter = createTreeWriter(children.get(0), writer, true);
+ valueWriter = createTreeWriter(children.get(1), writer, true);
lengths = createIntegerWriter(writer.createStream(columnId,
OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
if (rowIndexPosition != null) {
@@ -2300,8 +2479,15 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void createRowIndexEntry() throws IOException {
+ super.createRowIndexEntry();
+ keyWriter.createRowIndexEntry();
+ valueWriter.createRowIndexEntry();
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
MapColumnVector vec = (MapColumnVector) vector;
if (vector.isRepeating) {
@@ -2310,8 +2496,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
int childLength = (int) vec.lengths[0];
for(int i=0; i < length; ++i) {
lengths.write(childLength);
- childrenWriters[0].writeBatch(vec.keys, childOffset, childLength);
- childrenWriters[1].writeBatch(vec.values, childOffset, childLength);
+ keyWriter.writeBatch(vec.keys, childOffset, childLength);
+ valueWriter.writeBatch(vec.values, childOffset, childLength);
}
if (createBloomFilter) {
if (bloomFilter != null) {
@@ -2333,9 +2519,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
currentOffset = nextOffset;
currentLength = nextLength;
} else if (currentOffset + currentLength != nextOffset) {
- childrenWriters[0].writeBatch(vec.keys, currentOffset,
+ keyWriter.writeBatch(vec.keys, currentOffset,
currentLength);
- childrenWriters[1].writeBatch(vec.values, currentOffset,
+ valueWriter.writeBatch(vec.values, currentOffset,
currentLength);
currentOffset = nextOffset;
currentLength = nextLength;
@@ -2351,22 +2537,22 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
if (currentLength != 0) {
- childrenWriters[0].writeBatch(vec.keys, currentOffset,
+ keyWriter.writeBatch(vec.keys, currentOffset,
currentLength);
- childrenWriters[1].writeBatch(vec.values, currentOffset,
+ valueWriter.writeBatch(vec.values, currentOffset,
currentLength);
}
}
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
lengths.flush();
- for(TreeWriter child: childrenWriters) {
- child.writeStripe(builder, requiredIndexEntries);
- }
+ keyWriter.writeStripe(builder, stats, requiredIndexEntries);
+ valueWriter.writeStripe(builder, stats, requiredIndexEntries);
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
}
@@ -2379,13 +2565,34 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
- return super.estimateMemory() + lengths.estimateMemory();
+ public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+ super.updateFileStatistics(stats);
+ keyWriter.updateFileStatistics(stats);
+ valueWriter.updateFileStatistics(stats);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + lengths.estimateMemory() +
+ keyWriter.estimateMemory() + valueWriter.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ return keyWriter.getRawDataSize() + valueWriter.getRawDataSize();
+ }
+
+ @Override
+ public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+ super.writeFileStatistics(footer);
+ keyWriter.writeFileStatistics(footer);
+ valueWriter.writeFileStatistics(footer);
}
}
- private static class UnionTreeWriter extends TreeWriter {
+ private static class UnionTreeWriter extends TreeWriterBase {
private final RunLengthByteWriter tags;
+ private final TreeWriter[] childrenWriters;
UnionTreeWriter(int columnId,
TypeDescription schema,
@@ -2393,7 +2600,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
boolean nullable) throws IOException {
super(columnId, schema, writer, nullable);
List<TypeDescription> children = schema.getChildren();
- childrenWriters = new TreeWriter[children.size()];
+ childrenWriters = new TreeWriterBase[children.size()];
for(int i=0; i < childrenWriters.length; ++i) {
childrenWriters[i] =
createTreeWriter(children.get(i), writer, true);
@@ -2407,8 +2614,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
super.writeBatch(vector, offset, length);
UnionColumnVector vec = (UnionColumnVector) vector;
if (vector.isRepeating) {
@@ -2468,12 +2675,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
+ public void createRowIndexEntry() throws IOException {
+ super.createRowIndexEntry();
+ for(TreeWriter child: childrenWriters) {
+ child.createRowIndexEntry();
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
tags.flush();
for(TreeWriter child: childrenWriters) {
- child.writeStripe(builder, requiredIndexEntries);
+ child.writeStripe(builder, stats, requiredIndexEntries);
}
if (rowIndexPosition != null) {
recordPosition(rowIndexPosition);
@@ -2487,8 +2703,37 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
- long estimateMemory() {
- return super.estimateMemory() + tags.estimateMemory();
+ public void updateFileStatistics(OrcProto.StripeStatistics stats) {
+ super.updateFileStatistics(stats);
+ for(TreeWriter child: childrenWriters) {
+ child.updateFileStatistics(stats);
+ }
+ }
+
+ @Override
+ public long estimateMemory() {
+ long children = 0;
+ for(TreeWriter writer: childrenWriters) {
+ children += writer.estimateMemory();
+ }
+ return children + super.estimateMemory() + tags.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ long result = 0;
+ for(TreeWriter writer: childrenWriters) {
+ result += writer.getRawDataSize();
+ }
+ return result;
+ }
+
+ @Override
+ public void writeFileStatistics(OrcProto.Footer.Builder footer) {
+ super.writeFileStatistics(footer);
+ for(TreeWriter child: childrenWriters) {
+ child.writeFileStatistics(footer);
+ }
}
}
@@ -2554,90 +2799,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private static void writeTypes(OrcProto.Footer.Builder builder,
TypeDescription schema) {
- OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
- List<TypeDescription> children = schema.getChildren();
- switch (schema.getCategory()) {
- case BOOLEAN:
- type.setKind(OrcProto.Type.Kind.BOOLEAN);
- break;
- case BYTE:
- type.setKind(OrcProto.Type.Kind.BYTE);
- break;
- case SHORT:
- type.setKind(OrcProto.Type.Kind.SHORT);
- break;
- case INT:
- type.setKind(OrcProto.Type.Kind.INT);
- break;
- case LONG:
- type.setKind(OrcProto.Type.Kind.LONG);
- break;
- case FLOAT:
- type.setKind(OrcProto.Type.Kind.FLOAT);
- break;
- case DOUBLE:
- type.setKind(OrcProto.Type.Kind.DOUBLE);
- break;
- case STRING:
- type.setKind(OrcProto.Type.Kind.STRING);
- break;
- case CHAR:
- type.setKind(OrcProto.Type.Kind.CHAR);
- type.setMaximumLength(schema.getMaxLength());
- break;
- case VARCHAR:
- type.setKind(OrcProto.Type.Kind.VARCHAR);
- type.setMaximumLength(schema.getMaxLength());
- break;
- case BINARY:
- type.setKind(OrcProto.Type.Kind.BINARY);
- break;
- case TIMESTAMP:
- type.setKind(OrcProto.Type.Kind.TIMESTAMP);
- break;
- case DATE:
- type.setKind(OrcProto.Type.Kind.DATE);
- break;
- case DECIMAL:
- type.setKind(OrcProto.Type.Kind.DECIMAL);
- type.setPrecision(schema.getPrecision());
- type.setScale(schema.getScale());
- break;
- case LIST:
- type.setKind(OrcProto.Type.Kind.LIST);
- type.addSubtypes(children.get(0).getId());
- break;
- case MAP:
- type.setKind(OrcProto.Type.Kind.MAP);
- for(TypeDescription t: children) {
- type.addSubtypes(t.getId());
- }
- break;
- case STRUCT:
- type.setKind(OrcProto.Type.Kind.STRUCT);
- for(TypeDescription t: children) {
- type.addSubtypes(t.getId());
- }
- for(String field: schema.getFieldNames()) {
- type.addFieldNames(field);
- }
- break;
- case UNION:
- type.setKind(OrcProto.Type.Kind.UNION);
- for(TypeDescription t: children) {
- type.addSubtypes(t.getId());
- }
- break;
- default:
- throw new IllegalArgumentException("Unknown category: " +
- schema.getCategory());
- }
- builder.addTypes(type);
- if (children != null) {
- for(TypeDescription child: children) {
- writeTypes(builder, child);
- }
- }
+ builder.addAllTypes(OrcUtils.getOrcTypes(schema));
}
private void createRowIndexEntry() throws IOException {
@@ -2658,7 +2820,10 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
(int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
OrcProto.StripeFooter.Builder builder =
OrcProto.StripeFooter.newBuilder();
- treeWriter.writeStripe(builder, requiredIndexEntries);
+ OrcProto.StripeStatistics.Builder stats =
+ OrcProto.StripeStatistics.newBuilder();
+ treeWriter.writeStripe(builder, stats, requiredIndexEntries);
+ fileMetadata.addStripeStats(stats.build());
OrcProto.StripeInformation.Builder dirEntry =
OrcProto.StripeInformation.newBuilder()
.setNumberOfRows(rowsInStripe);
@@ -2670,58 +2835,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
private long computeRawDataSize() {
- return getRawDataSize(treeWriter, schema);
- }
-
- private long getRawDataSize(TreeWriter child,
- TypeDescription schema) {
- long total = 0;
- long numVals = child.fileStatistics.getNumberOfValues();
- switch (schema.getCategory()) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case FLOAT:
- return numVals * JavaDataModel.get().primitive1();
- case LONG:
- case DOUBLE:
- return numVals * JavaDataModel.get().primitive2();
- case STRING:
- case VARCHAR:
- case CHAR:
- // ORC strings are converted to java Strings. so use JavaDataModel to
- // compute the overall size of strings
- StringColumnStatistics scs = (StringColumnStatistics) child.fileStatistics;
- numVals = numVals == 0 ? 1 : numVals;
- int avgStringLen = (int) (scs.getSum() / numVals);
- return numVals * JavaDataModel.get().lengthForStringOfLength(avgStringLen);
- case DECIMAL:
- return numVals * JavaDataModel.get().lengthOfDecimal();
- case DATE:
- return numVals * JavaDataModel.get().lengthOfDate();
- case BINARY:
- // get total length of binary blob
- BinaryColumnStatistics bcs = (BinaryColumnStatistics) child.fileStatistics;
- return bcs.getSum();
- case TIMESTAMP:
- return numVals * JavaDataModel.get().lengthOfTimestamp();
- case LIST:
- case MAP:
- case UNION:
- case STRUCT: {
- TreeWriter[] childWriters = child.getChildrenWriters();
- List<TypeDescription> childTypes = schema.getChildren();
- for (int i=0; i < childWriters.length; ++i) {
- total += getRawDataSize(childWriters[i], childTypes.get(i));
- }
- break;
- }
- default:
- LOG.debug("Unknown object inspector category.");
- break;
- }
- return total;
+ return treeWriter.getRawDataSize();
}
private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
@@ -2738,18 +2852,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private void writeFileStatistics(OrcProto.Footer.Builder builder,
TreeWriter writer) throws IOException {
- builder.addStatistics(writer.fileStatistics.serialize());
- for(TreeWriter child: writer.getChildrenWriters()) {
- writeFileStatistics(builder, child);
- }
+ writer.writeFileStatistics(builder);
}
private void writeMetadata() throws IOException {
- OrcProto.Metadata.Builder builder = OrcProto.Metadata.newBuilder();
- for(OrcProto.StripeStatistics.Builder ssb : treeWriter.stripeStatsBuilders) {
- builder.addStripeStats(ssb.build());
- }
- physicalWriter.writeFileMetadata(builder);
+ physicalWriter.writeFileMetadata(fileMetadata);
}
private long writePostScript() throws IOException {
@@ -2899,10 +3006,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
dirEntry);
// since we have already written the stripe, just update stripe statistics
- treeWriter.stripeStatsBuilders.add(stripeStatistics.toBuilder());
-
- // update file level statistics
- updateFileStatistics(stripeStatistics);
+ treeWriter.updateFileStatistics(stripeStatistics);
+ fileMetadata.addStripeStats(stripeStatistics);
stripes.add(dirEntry.build());
@@ -2911,28 +3016,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
rowsInStripe = 0;
}
- private void updateFileStatistics(OrcProto.StripeStatistics stripeStatistics) {
- List<OrcProto.ColumnStatistics> cs = stripeStatistics.getColStatsList();
- List<TreeWriter> allWriters = getAllColumnTreeWriters(treeWriter);
- for (int i = 0; i < allWriters.size(); i++) {
- allWriters.get(i).fileStatistics.merge(ColumnStatisticsImpl.deserialize(cs.get(i)));
- }
- }
-
- private List<TreeWriter> getAllColumnTreeWriters(TreeWriter rootTreeWriter) {
- List<TreeWriter> result = new ArrayList<>();
- getAllColumnTreeWritersImpl(rootTreeWriter, result);
- return result;
- }
-
- private void getAllColumnTreeWritersImpl(TreeWriter tw,
- List<TreeWriter> result) {
- result.add(tw);
- for (TreeWriter child : tw.childrenWriters) {
- getAllColumnTreeWritersImpl(child, result);
- }
- }
-
@Override
public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata) {
if (userMetadata != null) {
http://git-wip-us.apache.org/repos/asf/orc/blob/cea20ac5/java/tools/src/test/org/apache/orc/tools/TestFileDump.java
----------------------------------------------------------------------
diff --git a/java/tools/src/test/org/apache/orc/tools/TestFileDump.java b/java/tools/src/test/org/apache/orc/tools/TestFileDump.java
index 1556ab4..ecdc5fc 100644
--- a/java/tools/src/test/org/apache/orc/tools/TestFileDump.java
+++ b/java/tools/src/test/org/apache/orc/tools/TestFileDump.java
@@ -308,7 +308,7 @@ public class TestFileDump {
writer.addRowBatch(batch);
writer.close();
- assertEquals(1564, 0, writer.getRawDataSize());
+ assertEquals(1564, writer.getRawDataSize());
assertEquals(2, writer.getNumberOfRows());
PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
@@ -322,7 +322,7 @@ public class TestFileDump {
Assert.assertEquals("{\"b\":true,\"bt\":10,\"s\":100,\"i\":1000,\"l\":10000,\"f\":4,\"d\":20,\"de\":\"4.2222\",\"t\":\"2014-11-25 18:09:24.0\",\"dt\":\"2014-11-25\",\"str\":\"string\",\"c\":\"hello\",\"vc\":\"hello\",\"m\":[{\"_key\":\"k1\",\"_value\":\"v1\"}],\"a\":[100,200],\"st\":{\"i\":10,\"s\":\"foo\"}}", lines[0]);
Assert.assertEquals("{\"b\":false,\"bt\":20,\"s\":200,\"i\":2000,\"l\":20000,\"f\":8,\"d\":40,\"de\":\"2.2222\",\"t\":\"2014-11-25 18:02:44.0\",\"dt\":\"2014-09-28\",\"str\":\"abcd\",\"c\":\"world\",\"vc\":\"world\",\"m\":[{\"_key\":\"k3\",\"_value\":\"v3\"}],\"a\":[200,300],\"st\":{\"i\":20,\"s\":\"bar\"}}", lines[1]);
}
-
+
// Test that if the fraction of rows that have distinct strings is greater than the configured
// threshold dictionary encoding is turned off. If dictionary encoding is turned off the length
// of the dictionary stream for the column will be 0 in the ORC file dump.