You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2018/05/01 21:44:42 UTC
orc git commit: ORC-49. Create RLE-based encoding for short decimals
in ORCv2.
Repository: orc
Updated Branches:
refs/heads/master 9eda0e4d5 -> 45b9c5082
ORC-49. Create RLE-based encoding for short decimals in ORCv2.
Fixes #257
Closes #147
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/45b9c508
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/45b9c508
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/45b9c508
Branch: refs/heads/master
Commit: 45b9c508206864fe3147979653dcdf2baafbad29
Parents: 9eda0e4
Author: Owen O'Malley <om...@apache.org>
Authored: Thu Apr 26 22:13:26 2018 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Tue May 1 14:42:25 2018 -0700
----------------------------------------------------------------------
java/core/src/java/org/apache/orc/OrcFile.java | 16 +-
.../java/org/apache/orc/impl/OrcCodecPool.java | 7 +
.../src/java/org/apache/orc/impl/OutStream.java | 2 +-
.../java/org/apache/orc/impl/ReaderImpl.java | 3 +-
.../org/apache/orc/impl/RecordReaderImpl.java | 11 +-
.../orc/impl/RunLengthIntegerReaderV2.java | 6 +-
.../org/apache/orc/impl/TreeReaderFactory.java | 125 ++++
.../java/org/apache/orc/impl/WriterImpl.java | 9 +-
.../org/apache/orc/impl/WriterInternal.java | 36 ++
.../orc/impl/writer/Decimal64TreeWriter.java | 158 +++++
.../org/apache/orc/impl/writer/TreeWriter.java | 11 +-
.../apache/orc/impl/writer/WriterImplV2.java | 617 +++++++++++++++++++
.../test/org/apache/orc/TestVectorOrcFile.java | 184 ++++--
site/specification/ORCv2.md | 17 +-
14 files changed, 1124 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java
index c9262e9..ac0beff 100644
--- a/java/core/src/java/org/apache/orc/OrcFile.java
+++ b/java/core/src/java/org/apache/orc/OrcFile.java
@@ -36,6 +36,8 @@ import org.apache.orc.impl.MemoryManagerImpl;
import org.apache.orc.impl.OrcTail;
import org.apache.orc.impl.ReaderImpl;
import org.apache.orc.impl.WriterImpl;
+import org.apache.orc.impl.WriterInternal;
+import org.apache.orc.impl.writer.WriterImplV2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -810,8 +812,16 @@ public class OrcFile {
) throws IOException {
FileSystem fs = opts.getFileSystem() == null ?
path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem();
-
- return new WriterImpl(fs, path, opts);
+ switch (opts.getVersion()) {
+ case V_0_11:
+ case V_0_12:
+ return new WriterImpl(fs, path, opts);
+ case UNSTABLE_PRE_2_0:
+ return new WriterImplV2(fs, path, opts);
+ default:
+ throw new IllegalArgumentException("Unknown version " +
+ opts.getVersion());
+ }
}
/**
@@ -964,7 +974,7 @@ public class OrcFile {
mergeMetadata(userMetadata, reader);
if (bufferSize < reader.getCompressionSize()) {
bufferSize = reader.getCompressionSize();
- ((WriterImpl) output).increaseCompressionSize(bufferSize);
+ ((WriterInternal) output).increaseCompressionSize(bufferSize);
}
}
List<OrcProto.StripeStatistics> statList =
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
index 24ee618..1c4b66f 100644
--- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
+++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java
@@ -100,6 +100,13 @@ public final class OrcCodecPool {
}
}
+ /**
+ * Clear the codec pool. Mostly used for testing.
+ */
+ public static void clear() {
+ POOL.clear();
+ }
+
private OrcCodecPool() {
// prevent instantiation
}
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/OutStream.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java
index a6009ab..d6302d5 100644
--- a/java/core/src/java/org/apache/orc/impl/OutStream.java
+++ b/java/core/src/java/org/apache/orc/impl/OutStream.java
@@ -108,7 +108,7 @@ public class OutStream extends PositionedOutputStream {
* @param bufferSize The ORC compression buffer size being checked.
* @throws IllegalArgumentException If bufferSize value exceeds threshold.
*/
- static void assertBufferSizeValid(int bufferSize) throws IllegalArgumentException {
+ public static void assertBufferSizeValid(int bufferSize) throws IllegalArgumentException {
if (bufferSize >= (1 << 23)) {
throw new IllegalArgumentException("Illegal value of ORC compression buffer size: " + bufferSize);
}
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
index 0daa0ed..e8d6be9 100644
--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
@@ -224,7 +224,8 @@ public class ReaderImpl implements Reader {
return deserializeStats(fileStats);
}
- static ColumnStatistics[] deserializeStats(List<OrcProto.ColumnStatistics> fileStats){
+ public static ColumnStatistics[] deserializeStats(
+ List<OrcProto.ColumnStatistics> fileStats) {
ColumnStatistics[] result = new ColumnStatistics[fileStats.size()];
for(int i=0; i < result.length; ++i) {
result[i] = ColumnStatisticsImpl.deserialize(fileStats.get(i));
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index 83eb039..53cc761 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -255,10 +255,13 @@ public class RecordReaderImpl implements RecordReader {
skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
}
- TreeReaderFactory.ReaderContext readerContext = new TreeReaderFactory.ReaderContext()
- .setSchemaEvolution(evolution)
- .skipCorrupt(skipCorrupt);
- reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(), readerContext);
+ TreeReaderFactory.ReaderContext readerContext =
+ new TreeReaderFactory.ReaderContext()
+ .setSchemaEvolution(evolution)
+ .skipCorrupt(skipCorrupt)
+ .fileFormat(fileReader.getFileVersion());
+ reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(),
+ readerContext);
this.fileIncluded = evolution.getFileIncluded();
indexes = new OrcProto.RowIndex[types.size()];
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
index 610d9b5..1a94aad 100644
--- a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
+++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java
@@ -363,9 +363,13 @@ public class RunLengthIntegerReaderV2 implements IntegerReader {
public void nextVector(ColumnVector previous,
long[] data,
int previousLen) throws IOException {
+ // if all nulls, just return
+ if (previous.isRepeating && !previous.noNulls && previous.isNull[0]) {
+ return;
+ }
previous.isRepeating = true;
for (int i = 0; i < previousLen; i++) {
- if (!previous.isNull[i]) {
+ if (previous.noNulls || !previous.isNull[i]) {
data[i] = next();
} else {
// The default value of null for int type in vectorized
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 652067d..ccae522 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.OrcProto;
import org.apache.orc.impl.writer.TimestampTreeWriter;
@@ -57,12 +58,15 @@ public class TreeReaderFactory {
boolean isSkipCorrupt();
String getWriterTimezone();
+
+ OrcFile.Version getFileFormat();
}
public static class ReaderContext implements Context {
private SchemaEvolution evolution;
private boolean skipCorrupt = false;
private String writerTimezone;
+ private OrcFile.Version fileFormat;
public ReaderContext setSchemaEvolution(SchemaEvolution evolution) {
this.evolution = evolution;
@@ -79,6 +83,11 @@ public class TreeReaderFactory {
return this;
}
+ public ReaderContext fileFormat(OrcFile.Version version) {
+ this.fileFormat = version;
+ return this;
+ }
+
@Override
public SchemaEvolution getSchemaEvolution() {
return evolution;
@@ -93,6 +102,11 @@ public class TreeReaderFactory {
public String getWriterTimezone() {
return writerTimezone;
}
+
+ @Override
+ public OrcFile.Version getFileFormat() {
+ return fileFormat;
+ }
}
public abstract static class TreeReader {
@@ -1259,6 +1273,107 @@ public class TreeReaderFactory {
}
}
+ public static class Decimal64TreeReader extends TreeReader {
+ protected final int precision;
+ protected final int scale;
+ protected final boolean skipCorrupt;
+ protected RunLengthIntegerReaderV2 valueReader;
+
+ Decimal64TreeReader(int columnId,
+ int precision,
+ int scale,
+ Context context) throws IOException {
+ this(columnId, null, null, null, precision, scale, context);
+ }
+
+ protected Decimal64TreeReader(int columnId,
+ InStream present,
+ InStream valueStream,
+ OrcProto.ColumnEncoding encoding,
+ int precision,
+ int scale,
+ Context context) throws IOException {
+ super(columnId, present, context);
+ this.precision = precision;
+ this.scale = scale;
+ valueReader = new RunLengthIntegerReaderV2(valueStream, true,
+ context.isSkipCorrupt());
+ skipCorrupt = context.isSkipCorrupt();
+ }
+
+ @Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId);
+ }
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ OrcProto.StripeFooter stripeFooter
+ ) throws IOException {
+ super.startStripe(streams, stripeFooter);
+ InStream stream = streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA));
+ valueReader = new RunLengthIntegerReaderV2(stream, true, skipCorrupt);
+ }
+
+ @Override
+ public void seek(PositionProvider[] index) throws IOException {
+ seek(index[columnId]);
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ super.seek(index);
+ valueReader.seek(index);
+ }
+
+ private void nextVector(DecimalColumnVector result,
+ final int batchSize) throws IOException {
+ if (result.noNulls) {
+ for (int r=0; r < batchSize; ++r) {
+ result.vector[r].setFromLongAndScale(valueReader.next(), scale);
+ }
+ } else if (!result.isRepeating || !result.isNull[0]) {
+ for (int r=0; r < batchSize; ++r) {
+ if (result.noNulls || !result.isNull[r]) {
+ result.vector[r].setFromLongAndScale(valueReader.next(), scale);
+ }
+ }
+ }
+ result.precision = (short) precision;
+ result.scale = (short) scale;
+ }
+
+ private void nextVector(Decimal64ColumnVector result,
+ final int batchSize) throws IOException {
+ valueReader.nextVector(result, result.vector, batchSize);
+ result.precision = (short) precision;
+ result.scale = (short) scale;
+ }
+
+ @Override
+ public void nextVector(ColumnVector result,
+ boolean[] isNull,
+ final int batchSize) throws IOException {
+ // Read present/isNull stream
+ super.nextVector(result, isNull, batchSize);
+ if (result instanceof Decimal64ColumnVector) {
+ nextVector((Decimal64ColumnVector) result, batchSize);
+ } else {
+ nextVector((DecimalColumnVector) result, batchSize);
+ }
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ valueReader.skip(items);
+ }
+ }
+
/**
* A tree reader that will read string columns. At the start of the
* stripe, it creates an internal reader based on whether a direct or
@@ -1352,6 +1467,8 @@ public class TreeReaderFactory {
LongColumnVector scratchlcv,
BytesColumnVector result, final int batchSize) throws IOException {
// Read lengths
+ scratchlcv.isRepeating = result.isRepeating;
+ scratchlcv.noNulls = result.noNulls;
scratchlcv.isNull = result.isNull; // Notice we are replacing the isNull vector here...
lengths.nextVector(scratchlcv, scratchlcv.vector, batchSize);
int totalLength = 0;
@@ -1645,6 +1762,8 @@ public class TreeReaderFactory {
}
// Read string offsets
+ scratchlcv.isRepeating = result.isRepeating;
+ scratchlcv.noNulls = result.noNulls;
scratchlcv.isNull = result.isNull;
scratchlcv.ensureSize((int) batchSize, false);
reader.nextVector(scratchlcv, scratchlcv.vector, batchSize);
@@ -2199,6 +2318,7 @@ public class TreeReaderFactory {
public static TreeReader createTreeReader(TypeDescription readerType,
Context context
) throws IOException {
+ OrcFile.Version version = context.getFileFormat();
final SchemaEvolution evolution = context.getSchemaEvolution();
TypeDescription fileType = evolution.getFileType(readerType);
if (fileType == null || !evolution.includeReaderColumn(readerType.getId())){
@@ -2241,6 +2361,11 @@ public class TreeReaderFactory {
case DATE:
return new DateTreeReader(fileType.getId(), context);
case DECIMAL:
+ if (version == OrcFile.Version.UNSTABLE_PRE_2_0 &&
+ fileType.getPrecision() <= TypeDescription.MAX_DECIMAL64_PRECISION){
+ return new Decimal64TreeReader(fileType.getId(), fileType.getPrecision(),
+ fileType.getScale(), context);
+ }
return new DecimalTreeReader(fileType.getId(), fileType.getPrecision(),
fileType.getScale(), context);
case STRUCT:
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
index 0ddd00a..c3656d4 100644
--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
@@ -72,7 +72,7 @@ import com.google.protobuf.ByteString;
* to be confined to a single thread as well.
*
*/
-public class WriterImpl implements Writer, MemoryManager.Callback {
+public class WriterImpl implements WriterInternal, MemoryManager.Callback {
private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
@@ -193,12 +193,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
return estBufferSize > bs ? bs : estBufferSize;
}
- /**
- * Increase the buffer size for this writer.
- * This function is internal only and should only be called by the
- * ORC file merger.
- * @param newSize the new buffer size.
- */
+ @Override
public void increaseCompressionSize(int newSize) {
if (newSize > bufferSize) {
bufferSize = newSize;
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/WriterInternal.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/WriterInternal.java b/java/core/src/java/org/apache/orc/impl/WriterInternal.java
new file mode 100644
index 0000000..29f5cf5
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/WriterInternal.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.orc.Writer;
+
+/**
+ * The ORC internal API to the writer.
+ */
+public interface WriterInternal extends Writer {
+
+ /**
+ * Increase the buffer size for this writer.
+ * This function is internal only and should only be called by the
+ * ORC file merger.
+ * @param newSize the new buffer size.
+ */
+ void increaseCompressionSize(int newSize);
+
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
new file mode 100644
index 0000000..b79e3b2
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/Decimal64TreeWriter.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PositionRecorder;
+import org.apache.orc.impl.RunLengthIntegerWriterV2;
+
+import java.io.IOException;
+
+/**
+ * Writer for short decimals in ORCv2.
+ */
+public class Decimal64TreeWriter extends TreeWriterBase {
+ private final RunLengthIntegerWriterV2 valueWriter;
+ private final int scale;
+
+ public Decimal64TreeWriter(int columnId,
+ TypeDescription schema,
+ WriterContext writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ OutStream stream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+ // Use RLEv2 until we have the new RLEv3.
+ valueWriter = new RunLengthIntegerWriterV2(stream, true, true);
+ scale = schema.getScale();
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ private void writeBatch(DecimalColumnVector vector, int offset,
+ int length) throws IOException {
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ HiveDecimalWritable value = vector.vector[0];
+ long lg = value.serialize64(scale);
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ bloomFilterUtf8.addLong(lg);
+ }
+ for (int i = 0; i < length; ++i) {
+ valueWriter.write(lg);
+ }
+ }
+ } else {
+ for (int i = 0; i < length; ++i) {
+ if (vector.noNulls || !vector.isNull[i + offset]) {
+ HiveDecimalWritable value = vector.vector[i + offset];
+ long lg = value.serialize64(scale);
+ valueWriter.write(lg);
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ bloomFilterUtf8.addLong(lg);
+ }
+ }
+ }
+ }
+ }
+
+ private void writeBatch(Decimal64ColumnVector vector, int offset,
+ int length) throws IOException {
+ assert(scale == vector.scale);
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ HiveDecimalWritable value = vector.getScratchWritable();
+ long lg = vector.vector[0];
+ value.setFromLongAndScale(lg, vector.scale);
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ bloomFilterUtf8.addLong(lg);
+ }
+ for (int i = 0; i < length; ++i) {
+ valueWriter.write(lg);
+ }
+ }
+ } else {
+ HiveDecimalWritable value = vector.getScratchWritable();
+ for (int i = 0; i < length; ++i) {
+ if (vector.noNulls || !vector.isNull[i + offset]) {
+ long lg = vector.vector[i + offset];
+ valueWriter.write(lg);
+ value.setFromLongAndScale(lg, vector.scale);
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ bloomFilterUtf8.addLong(lg);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ if (vector instanceof Decimal64ColumnVector) {
+ writeBatch((Decimal64ColumnVector) vector, offset, length);
+ } else {
+ writeBatch((DecimalColumnVector) vector, offset, length);
+ }
+ }
+
+ @Override
+ public void writeStripe(OrcProto.StripeFooter.Builder builder,
+ OrcProto.StripeStatistics.Builder stats,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, stats, requiredIndexEntries);
+ if (rowIndexPosition != null) {
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ valueWriter.getPosition(recorder);
+ }
+
+ @Override
+ public long estimateMemory() {
+ return super.estimateMemory() + valueWriter.estimateMemory();
+ }
+
+ @Override
+ public long getRawDataSize() {
+ return fileStatistics.getNumberOfValues() * JavaDataModel.get().primitive2();
+ }
+
+ @Override
+ public void flushStreams() throws IOException {
+ super.flushStreams();
+ valueWriter.flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
index b1a6bec..bfa403e 100644
--- a/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/writer/TreeWriter.java
@@ -20,6 +20,7 @@ package org.apache.orc.impl.writer;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.TypeDescription;
@@ -45,7 +46,7 @@ public interface TreeWriter {
long getRawDataSize();
/**
- * Write a VectorizedRowBath to the file. This is called by the WriterImpl
+ * Write a VectorizedRowBath to the file. This is called by the WriterImplV2
* at the top level.
* @param batch the list of all of the columns
* @param offset the first row from the batch to write
@@ -101,10 +102,11 @@ public interface TreeWriter {
*/
void writeFileStatistics(OrcProto.Footer.Builder footer);
- public class Factory {
+ class Factory {
public static TreeWriter create(TypeDescription schema,
WriterContext streamFactory,
boolean nullable) throws IOException {
+ OrcFile.Version version = streamFactory.getVersion();
switch (schema.getCategory()) {
case BOOLEAN:
return new BooleanTreeWriter(schema.getId(),
@@ -142,6 +144,11 @@ public interface TreeWriter {
return new DateTreeWriter(schema.getId(),
schema, streamFactory, nullable);
case DECIMAL:
+ if (version == OrcFile.Version.UNSTABLE_PRE_2_0 &&
+ schema.getPrecision() <= TypeDescription.MAX_DECIMAL64_PRECISION) {
+ return new Decimal64TreeWriter(schema.getId(),
+ schema, streamFactory, nullable);
+ }
return new DecimalTreeWriter(schema.getId(),
schema, streamFactory, nullable);
case STRUCT:
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
----------------------------------------------------------------------
diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
new file mode 100644
index 0000000..ab4fc58
--- /dev/null
+++ b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl.writer;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.ColumnStatistics;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.MemoryManager;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.PhysicalWriter;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.PhysicalFsWriter;
+import org.apache.orc.impl.ReaderImpl;
+import org.apache.orc.impl.StreamName;
+import org.apache.orc.impl.WriterImpl;
+import org.apache.orc.impl.WriterInternal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+/**
+ * An ORCv2 file writer. The file is divided into stripes, which is the natural
+ * unit of work when reading. Each stripe is buffered in memory until the
+ * memory reaches the stripe size and then it is written out broken down by
+ * columns. Each column is written by a TreeWriter that is specific to that
+ * type of column. TreeWriters may have children TreeWriters that handle the
+ * sub-types. Each of the TreeWriters writes the column's data as a set of
+ * streams.
+ *
+ * This class is unsynchronized like most Stream objects, so from the creation
+ * of an OrcFile and all access to a single instance has to be from a single
+ * thread.
+ *
+ * There are no known cases where these happen between different threads today.
+ *
+ * Caveat: the MemoryManager is created during WriterOptions create, that has
+ * to be confined to a single thread as well.
+ *
+ */
+public class WriterImplV2 implements WriterInternal, MemoryManager.Callback {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WriterImplV2.class);
+
+ private static final int MIN_ROW_INDEX_STRIDE = 1000;
+
+ private final Path path;
+ private long adjustedStripeSize;
+ private final int rowIndexStride;
+ private final CompressionKind compress;
+ private int bufferSize;
+ private final TypeDescription schema;
+ private final PhysicalWriter physicalWriter;
+ private final OrcFile.WriterVersion writerVersion;
+
+ private long rowCount = 0;
+ private long rowsInStripe = 0;
+ private long rawDataSize = 0;
+ private int rowsInIndex = 0;
+ private long lastFlushOffset = 0;
+ private int stripesAtLastFlush = -1;
+ private final List<OrcProto.StripeInformation> stripes =
+ new ArrayList<>();
+ private final OrcProto.Metadata.Builder fileMetadata =
+ OrcProto.Metadata.newBuilder();
+ private final Map<String, ByteString> userMetadata =
+ new TreeMap<>();
+ private final TreeWriter treeWriter;
+ private final boolean buildIndex;
+ private final MemoryManager memoryManager;
+ private final OrcFile.Version version;
+ private final Configuration conf;
+ private final OrcFile.WriterCallback callback;
+ private final OrcFile.WriterContext callbackContext;
+ private final OrcFile.EncodingStrategy encodingStrategy;
+ private final OrcFile.CompressionStrategy compressionStrategy;
+ private final boolean[] bloomFilterColumns;
+ private final double bloomFilterFpp;
+ private final OrcFile.BloomFilterVersion bloomFilterVersion;
+ private final boolean writeTimeZone;
+
+ public WriterImplV2(FileSystem fs,
+ Path path,
+ OrcFile.WriterOptions opts) throws IOException {
+ this.path = path;
+ this.conf = opts.getConfiguration();
+ this.callback = opts.getCallback();
+ this.schema = opts.getSchema();
+ this.writerVersion = opts.getWriterVersion();
+ bloomFilterVersion = opts.getBloomFilterVersion();
+ if (callback != null) {
+ callbackContext = new OrcFile.WriterContext(){
+
+ @Override
+ public Writer getWriter() {
+ return WriterImplV2.this;
+ }
+ };
+ } else {
+ callbackContext = null;
+ }
+ writeTimeZone = hasTimestamp(schema);
+ this.adjustedStripeSize = opts.getStripeSize();
+ this.version = opts.getVersion();
+ this.encodingStrategy = opts.getEncodingStrategy();
+ this.compressionStrategy = opts.getCompressionStrategy();
+ this.compress = opts.getCompress();
+ this.rowIndexStride = opts.getRowIndexStride();
+ this.memoryManager = opts.getMemoryManager();
+ buildIndex = rowIndexStride > 0;
+ int numColumns = schema.getMaximumId() + 1;
+ if (opts.isEnforceBufferSize()) {
+ OutStream.assertBufferSizeValid(opts.getBufferSize());
+ this.bufferSize = opts.getBufferSize();
+ } else {
+ this.bufferSize = WriterImpl.getEstimatedBufferSize(adjustedStripeSize,
+ numColumns, opts.getBufferSize());
+ }
+ if (version == OrcFile.Version.FUTURE) {
+ throw new IllegalArgumentException("Can not write in a unknown version.");
+ } else if (version == OrcFile.Version.UNSTABLE_PRE_2_0) {
+ LOG.warn("ORC files written in " + version.getName() + " will not be" +
+ " readable by other versions of the software. It is only for" +
+ " developer testing.");
+ }
+ if (version == OrcFile.Version.V_0_11) {
+ /* do not write bloom filters for ORC v11 */
+ this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
+ } else {
+ this.bloomFilterColumns =
+ OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
+ }
+ this.bloomFilterFpp = opts.getBloomFilterFpp();
+ this.physicalWriter = opts.getPhysicalWriter() == null ?
+ new PhysicalFsWriter(fs, path, opts) : opts.getPhysicalWriter();
+ physicalWriter.writeHeader();
+ treeWriter = TreeWriter.Factory.create(schema, new StreamFactory(), false);
+ if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
+ throw new IllegalArgumentException("Row stride must be at least " +
+ MIN_ROW_INDEX_STRIDE);
+ }
+
+ // ensure that we are able to handle callbacks before we register ourselves
+ memoryManager.addWriter(path, opts.getStripeSize(), this);
+ LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" +
+ " compression: {} bufferSize: {}", path, adjustedStripeSize, opts.getBlockSize(),
+ compress, bufferSize);
+ }
+
+ @Override
+ public boolean checkMemory(double newScale) throws IOException {
+ long limit = Math.round(adjustedStripeSize * newScale);
+ long size = treeWriter.estimateMemory();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ORC writer " + physicalWriter + " size = " + size +
+ " limit = " + limit);
+ }
+ if (size > limit) {
+ flushStripe();
+ return true;
+ }
+ return false;
+ }
+
+
+ CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) {
+ // TODO: modify may create a new codec here. We want to end() it when the stream is closed,
+ // but at this point there's no close() for the stream.
+ CompressionCodec result = physicalWriter.getCompressionCodec();
+ if (result != null) {
+ switch (kind) {
+ case BLOOM_FILTER:
+ case DATA:
+ case DICTIONARY_DATA:
+ case BLOOM_FILTER_UTF8:
+ if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
+ result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
+ CompressionCodec.Modifier.TEXT));
+ } else {
+ result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+ CompressionCodec.Modifier.TEXT));
+ }
+ break;
+ case LENGTH:
+ case DICTIONARY_COUNT:
+ case PRESENT:
+ case ROW_INDEX:
+ case SECONDARY:
+ // easily compressed using the fastest modes
+ result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
+ CompressionCodec.Modifier.BINARY));
+ break;
+ default:
+ LOG.info("Missing ORC compression modifiers for " + kind);
+ break;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void increaseCompressionSize(int newSize) {
+ if (newSize > bufferSize) {
+ bufferSize = newSize;
+ }
+ }
+
+ /**
+ * Interface from the Writer to the TreeWriters. This limits the visibility
+ * that the TreeWriters have into the Writer.
+ */
+ private class StreamFactory implements WriterContext {
+ /**
+ * Create a stream to store part of a column.
+ * @param column the column id for the stream
+ * @param kind the kind of stream
+ * @return The output outStream that the section needs to be written to.
+ */
+ public OutStream createStream(int column,
+ OrcProto.Stream.Kind kind
+ ) throws IOException {
+ final StreamName name = new StreamName(column, kind);
+ CompressionCodec codec = getCustomizedCodec(kind);
+
+ return new OutStream(physicalWriter.toString(), bufferSize, codec,
+ physicalWriter.createDataStream(name));
+ }
+
+ /**
+ * Get the stride rate of the row index.
+ */
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ /**
+ * Should be building the row index.
+ * @return true if we are building the index
+ */
+ public boolean buildIndex() {
+ return buildIndex;
+ }
+
+ /**
+ * Is the ORC file compressed?
+ * @return are the streams compressed
+ */
+ public boolean isCompressed() {
+ return physicalWriter.getCompressionCodec() != null;
+ }
+
+ /**
+ * Get the encoding strategy to use.
+ * @return encoding strategy
+ */
+ public OrcFile.EncodingStrategy getEncodingStrategy() {
+ return encodingStrategy;
+ }
+
+ /**
+ * Get the bloom filter columns
+ * @return bloom filter columns
+ */
+ public boolean[] getBloomFilterColumns() {
+ return bloomFilterColumns;
+ }
+
+ /**
+ * Get bloom filter false positive percentage.
+ * @return fpp
+ */
+ public double getBloomFilterFPP() {
+ return bloomFilterFpp;
+ }
+
+ /**
+ * Get the writer's configuration.
+ * @return configuration
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Get the version of the file to write.
+ */
+ public OrcFile.Version getVersion() {
+ return version;
+ }
+
+ /**
+ * Get the PhysicalWriter.
+ *
+ * @return the file's physical writer.
+ */
+ @Override
+ public PhysicalWriter getPhysicalWriter() {
+ return physicalWriter;
+ }
+
+ public OrcFile.BloomFilterVersion getBloomFilterVersion() {
+ return bloomFilterVersion;
+ }
+
+ public void writeIndex(StreamName name,
+ OrcProto.RowIndex.Builder index) throws IOException {
+ physicalWriter.writeIndex(name, index, getCustomizedCodec(name.getKind()));
+ }
+
+ public void writeBloomFilter(StreamName name,
+ OrcProto.BloomFilterIndex.Builder bloom
+ ) throws IOException {
+ physicalWriter.writeBloomFilter(name, bloom,
+ getCustomizedCodec(name.getKind()));
+ }
+ }
+
+
+ private static void writeTypes(OrcProto.Footer.Builder builder,
+ TypeDescription schema) {
+ builder.addAllTypes(OrcUtils.getOrcTypes(schema));
+ }
+
+ private void createRowIndexEntry() throws IOException {
+ treeWriter.createRowIndexEntry();
+ rowsInIndex = 0;
+ }
+
+ private void flushStripe() throws IOException {
+ if (buildIndex && rowsInIndex != 0) {
+ createRowIndexEntry();
+ }
+ if (rowsInStripe != 0) {
+ if (callback != null) {
+ callback.preStripeWrite(callbackContext);
+ }
+ // finalize the data for the stripe
+ int requiredIndexEntries = rowIndexStride == 0 ? 0 :
+ (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
+ OrcProto.StripeFooter.Builder builder =
+ OrcProto.StripeFooter.newBuilder();
+ if (writeTimeZone) {
+ builder.setWriterTimezone(TimeZone.getDefault().getID());
+ }
+ OrcProto.StripeStatistics.Builder stats =
+ OrcProto.StripeStatistics.newBuilder();
+
+ treeWriter.flushStreams();
+ treeWriter.writeStripe(builder, stats, requiredIndexEntries);
+
+ OrcProto.StripeInformation.Builder dirEntry =
+ OrcProto.StripeInformation.newBuilder()
+ .setNumberOfRows(rowsInStripe);
+ physicalWriter.finalizeStripe(builder, dirEntry);
+
+ fileMetadata.addStripeStats(stats.build());
+ stripes.add(dirEntry.build());
+ rowCount += rowsInStripe;
+ rowsInStripe = 0;
+ }
+ }
+
+ private long computeRawDataSize() {
+ return treeWriter.getRawDataSize();
+ }
+
+ private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) {
+ switch (kind) {
+ case NONE: return OrcProto.CompressionKind.NONE;
+ case ZLIB: return OrcProto.CompressionKind.ZLIB;
+ case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
+ case LZO: return OrcProto.CompressionKind.LZO;
+ case LZ4: return OrcProto.CompressionKind.LZ4;
+ default:
+ throw new IllegalArgumentException("Unknown compression " + kind);
+ }
+ }
+
+ private void writeFileStatistics(OrcProto.Footer.Builder builder,
+ TreeWriter writer) throws IOException {
+ writer.writeFileStatistics(builder);
+ }
+
+ private void writeMetadata() throws IOException {
+ physicalWriter.writeFileMetadata(fileMetadata);
+ }
+
+ private long writePostScript() throws IOException {
+ OrcProto.PostScript.Builder builder =
+ OrcProto.PostScript.newBuilder()
+ .setCompression(writeCompressionKind(compress))
+ .setMagic(OrcFile.MAGIC)
+ .addVersion(version.getMajor())
+ .addVersion(version.getMinor())
+ .setWriterVersion(writerVersion.getId());
+ if (compress != CompressionKind.NONE) {
+ builder.setCompressionBlockSize(bufferSize);
+ }
+ return physicalWriter.writePostScript(builder);
+ }
+
+ private long writeFooter() throws IOException {
+ writeMetadata();
+ OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
+ builder.setNumberOfRows(rowCount);
+ builder.setRowIndexStride(rowIndexStride);
+ rawDataSize = computeRawDataSize();
+ // serialize the types
+ writeTypes(builder, schema);
+ // add the stripe information
+ for(OrcProto.StripeInformation stripe: stripes) {
+ builder.addStripes(stripe);
+ }
+ // add the column statistics
+ writeFileStatistics(builder, treeWriter);
+ // add all of the user metadata
+ for(Map.Entry<String, ByteString> entry: userMetadata.entrySet()) {
+ builder.addMetadata(OrcProto.UserMetadataItem.newBuilder()
+ .setName(entry.getKey()).setValue(entry.getValue()));
+ }
+ builder.setWriter(OrcFile.WriterImplementation.ORC_JAVA.getId());
+ physicalWriter.writeFileFooter(builder);
+ return writePostScript();
+ }
+
+ @Override
+ public TypeDescription getSchema() {
+ return schema;
+ }
+
+ @Override
+ public void addUserMetadata(String name, ByteBuffer value) {
+ userMetadata.put(name, ByteString.copyFrom(value));
+ }
+
+ @Override
+ public void addRowBatch(VectorizedRowBatch batch) throws IOException {
+ if (buildIndex) {
+ // Batch the writes up to the rowIndexStride so that we can get the
+ // right size indexes.
+ int posn = 0;
+ while (posn < batch.size) {
+ int chunkSize = Math.min(batch.size - posn,
+ rowIndexStride - rowsInIndex);
+ treeWriter.writeRootBatch(batch, posn, chunkSize);
+ posn += chunkSize;
+ rowsInIndex += chunkSize;
+ rowsInStripe += chunkSize;
+ if (rowsInIndex >= rowIndexStride) {
+ createRowIndexEntry();
+ }
+ }
+ } else {
+ rowsInStripe += batch.size;
+ treeWriter.writeRootBatch(batch, 0, batch.size);
+ }
+ memoryManager.addedRow(batch.size);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (callback != null) {
+ callback.preFooterWrite(callbackContext);
+ }
+ // remove us from the memory manager so that we don't get any callbacks
+ memoryManager.removeWriter(path);
+ // actually close the file
+ flushStripe();
+ lastFlushOffset = writeFooter();
+ physicalWriter.close();
+ }
+
+ /**
+ * Raw data size will be compute when writing the file footer. Hence raw data
+ * size value will be available only after closing the writer.
+ */
+ @Override
+ public long getRawDataSize() {
+ return rawDataSize;
+ }
+
+ /**
+ * Row count gets updated when flushing the stripes. To get accurate row
+ * count call this method after writer is closed.
+ */
+ @Override
+ public long getNumberOfRows() {
+ return rowCount;
+ }
+
+ @Override
+ public long writeIntermediateFooter() throws IOException {
+ // flush any buffered rows
+ flushStripe();
+ // write a footer
+ if (stripesAtLastFlush != stripes.size()) {
+ if (callback != null) {
+ callback.preFooterWrite(callbackContext);
+ }
+ lastFlushOffset = writeFooter();
+ stripesAtLastFlush = stripes.size();
+ physicalWriter.flush();
+ }
+ return lastFlushOffset;
+ }
+
+ static void checkArgument(boolean expression, String message) {
+ if (!expression) {
+ throw new IllegalArgumentException(message);
+ }
+ }
+
+ @Override
+ public void appendStripe(byte[] stripe, int offset, int length,
+ StripeInformation stripeInfo,
+ OrcProto.StripeStatistics stripeStatistics) throws IOException {
+ checkArgument(stripe != null, "Stripe must not be null");
+ checkArgument(length <= stripe.length,
+ "Specified length must not be greater specified array length");
+ checkArgument(stripeInfo != null, "Stripe information must not be null");
+ checkArgument(stripeStatistics != null,
+ "Stripe statistics must not be null");
+
+ rowsInStripe = stripeInfo.getNumberOfRows();
+ // update stripe information
+ OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation
+ .newBuilder()
+ .setNumberOfRows(rowsInStripe)
+ .setIndexLength(stripeInfo.getIndexLength())
+ .setDataLength(stripeInfo.getDataLength())
+ .setFooterLength(stripeInfo.getFooterLength());
+ physicalWriter.appendRawStripe(ByteBuffer.wrap(stripe, offset, length),
+ dirEntry);
+
+ // since we have already written the stripe, just update stripe statistics
+ treeWriter.updateFileStatistics(stripeStatistics);
+ fileMetadata.addStripeStats(stripeStatistics);
+
+ stripes.add(dirEntry.build());
+
+ // reset it after writing the stripe
+ rowCount += rowsInStripe;
+ rowsInStripe = 0;
+ }
+
+ @Override
+ public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata) {
+ if (userMetadata != null) {
+ for (OrcProto.UserMetadataItem item : userMetadata) {
+ this.userMetadata.put(item.getName(), item.getValue());
+ }
+ }
+ }
+
+ @Override
+ public ColumnStatistics[] getStatistics()
+ throws IOException {
+ // Generate the stats
+ OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder();
+
+ // add the column statistics
+ writeFileStatistics(builder, treeWriter);
+ return ReaderImpl.deserializeStats(builder.getStatisticsList());
+ }
+
+ public CompressionCodec getCompressionCodec() {
+ return physicalWriter.getCompressionCodec();
+ }
+
+ private static boolean hasTimestamp(TypeDescription schema) {
+ if (schema.getCategory() == TypeDescription.Category.TIMESTAMP) {
+ return true;
+ }
+ List<TypeDescription> children = schema.getChildren();
+ if (children != null) {
+ for (TypeDescription child : children) {
+ if (hasTimestamp(child)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
----------------------------------------------------------------------
diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
index 81d248c..424eb96 100644
--- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
+++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java
@@ -21,6 +21,7 @@ package org.apache.orc;
import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.orc.impl.OrcCodecPool;
+import org.apache.orc.impl.PhysicalFsWriter;
import org.apache.orc.impl.WriterImpl;
import org.apache.orc.OrcFile.WriterOptions;
@@ -54,10 +55,13 @@ import org.apache.orc.impl.DataReaderProperties;
import org.apache.orc.impl.OrcIndex;
import org.apache.orc.impl.RecordReaderImpl;
import org.apache.orc.impl.RecordReaderUtils;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import java.io.File;
@@ -70,6 +74,7 @@ import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -82,8 +87,26 @@ import static org.junit.Assert.*;
/**
* Tests for the vectorized reader and writer for ORC files.
*/
+@RunWith(Parameterized.class)
public class TestVectorOrcFile {
+ @Parameterized.Parameter
+ public OrcFile.Version fileFormat;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> getParameters() {
+ OrcFile.Version[] params = new OrcFile.Version[]{
+ OrcFile.Version.V_0_11,
+ OrcFile.Version.V_0_12,
+ OrcFile.Version.UNSTABLE_PRE_2_0};
+
+ List<Object[]> result = new ArrayList<>();
+ for(OrcFile.Version v: params) {
+ result.add(new Object[]{v});
+ }
+ return result;
+ }
+
public static String getFileFromClasspath(String name) {
URL url = ClassLoader.getSystemResource(name);
if (url == null) {
@@ -177,12 +200,14 @@ public class TestVectorOrcFile {
conf = new Configuration();
fs = FileSystem.getLocal(conf);
testFilePath = new Path(workDir, "TestVectorOrcFile." +
- testCaseName.getMethodName() + ".orc");
+ testCaseName.getMethodName().replaceFirst("\\[[0-9]+\\]", "")
+ + "." + fileFormat.getName() + ".orc");
fs.delete(testFilePath, false);
}
@Test
public void testReadFormat_0_11() throws Exception {
+ Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
Path oldFilePath =
new Path(getFileFromClasspath("orc-file-11-format.orc"));
Reader reader = OrcFile.createReader(oldFilePath,
@@ -351,7 +376,7 @@ public class TestVectorOrcFile {
TypeDescription schema = TypeDescription.createTimestamp();
Writer writer = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
- .bufferSize(10000).version(org.apache.orc.OrcFile.Version.V_0_11));
+ .bufferSize(10000).version(fileFormat));
List<Timestamp> tslist = Lists.newArrayList();
tslist.add(Timestamp.valueOf("2037-01-01 00:00:00.000999"));
tslist.add(Timestamp.valueOf("2003-01-01 00:00:00.000000222"));
@@ -407,7 +432,8 @@ public class TestVectorOrcFile {
OrcFile.writerOptions(conf)
.setSchema(schema)
.stripeSize(100000)
- .bufferSize(10000));
+ .bufferSize(10000)
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 4;
BytesColumnVector field1 = (BytesColumnVector) batch.cols[0];
@@ -456,7 +482,9 @@ public class TestVectorOrcFile {
assertEquals("bar", ((StringColumnStatistics) stats[2]).getMinimum());
assertEquals("hi", ((StringColumnStatistics) stats[2]).getMaximum());
assertEquals(8, ((StringColumnStatistics) stats[2]).getSum());
- assertEquals("count: 3 hasNull: true bytesOnDisk: 22 min: bar max: hi sum: 8",
+ assertEquals("count: 3 hasNull: true bytesOnDisk: " +
+ (fileFormat == OrcFile.Version.V_0_11 ? "30" : "22") +
+ " min: bar max: hi sum: 8",
stats[2].toString());
// check the inspectors
@@ -494,7 +522,8 @@ public class TestVectorOrcFile {
.addField("dec1", TypeDescription.createDecimal());
Writer writer = OrcFile.createWriter(testFilePath,
- OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000).bufferSize(10000));
+ OrcFile.writerOptions(conf).setSchema(schema).stripeSize(100000)
+ .bufferSize(10000).version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 4;
DecimalColumnVector field1 = (DecimalColumnVector) batch.cols[0];
@@ -527,7 +556,8 @@ public class TestVectorOrcFile {
OrcFile.writerOptions(conf)
.setSchema(schema)
.stripeSize(100000)
- .bufferSize(10000));
+ .bufferSize(10000)
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1000;
LongColumnVector field1 = (LongColumnVector) batch.cols[0];
@@ -980,7 +1010,8 @@ public class TestVectorOrcFile {
OrcFile.writerOptions(conf)
.setSchema(schema)
.stripeSize(100000)
- .bufferSize(10000));
+ .bufferSize(10000)
+ .version(fileFormat));
assertEmptyStats(writer.getStatistics());
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 2;
@@ -1050,8 +1081,9 @@ public class TestVectorOrcFile {
assertEquals(1024, ((IntegerColumnStatistics) stats[3]).getMinimum());
assertEquals(true, ((IntegerColumnStatistics) stats[3]).isSumDefined());
assertEquals(3072, ((IntegerColumnStatistics) stats[3]).getSum());
- assertEquals("count: 2 hasNull: false bytesOnDisk: 9 min: 1024 max: 2048 sum: 3072",
- stats[3].toString());
+ assertEquals("count: 2 hasNull: false bytesOnDisk: " +
+ (fileFormat == OrcFile.Version.V_0_11 ? "8" : "9") +
+ " min: 1024 max: 2048 sum: 3072", stats[3].toString());
StripeStatistics ss = reader.getStripeStatistics().get(0);
assertEquals(2, ss.getColumnStatistics()[0].getNumberOfValues());
@@ -1065,7 +1097,9 @@ public class TestVectorOrcFile {
assertEquals("count: 2 hasNull: false bytesOnDisk: 15 min: -15.0 max: -5.0 sum: -20.0",
stats[7].toString());
- assertEquals("count: 2 hasNull: false bytesOnDisk: 14 min: bye max: hi sum: 5", stats[9].toString());
+ assertEquals("count: 2 hasNull: false bytesOnDisk: " +
+ (fileFormat == OrcFile.Version.V_0_11 ? "20" : "14") +
+ " min: bye max: hi sum: 5", stats[9].toString());
// check the schema
TypeDescription readerSchema = reader.getSchema();
@@ -1194,7 +1228,8 @@ public class TestVectorOrcFile {
.stripeSize(1000)
.compress(CompressionKind.NONE)
.bufferSize(100)
- .rowIndexStride(1000));
+ .rowIndexStride(1000)
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
Random r1 = new Random(1);
Random r2 = new Random(2);
@@ -1289,7 +1324,8 @@ public class TestVectorOrcFile {
.setSchema(schema)
.stripeSize(1000)
.compress(CompressionKind.NONE)
- .bufferSize(100));
+ .bufferSize(100)
+ .version(fileFormat));
writer.close();
Reader reader = OrcFile.createReader(testFilePath,
OrcFile.readerOptions(conf).filesystem(fs));
@@ -1311,7 +1347,8 @@ public class TestVectorOrcFile {
.setSchema(schema)
.stripeSize(1000)
.compress(CompressionKind.NONE)
- .bufferSize(100));
+ .bufferSize(100)
+ .version(fileFormat));
writer.addUserMetadata("my.meta", byteBuf(1, 2, 3, 4, 5, 6, 7, -1, -2, 127,
-128));
writer.addUserMetadata("clobber", byteBuf(1, 2, 3));
@@ -1370,7 +1407,8 @@ public class TestVectorOrcFile {
.setSchema(schema)
.stripeSize(100000)
.bufferSize(10000)
- .blockPadding(false));
+ .blockPadding(false)
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1000;
TimestampColumnVector timestampColVector = (TimestampColumnVector) batch.cols[0];
@@ -1488,7 +1526,8 @@ public class TestVectorOrcFile {
Writer writer = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.setSchema(schema)
- .compress(CompressionKind.NONE));
+ .compress(CompressionKind.NONE)
+ .version(fileFormat));
Decimal64ColumnVector cv = (Decimal64ColumnVector) batch.cols[0];
cv.precision = 18;
cv.scale = 3;
@@ -1503,6 +1542,13 @@ public class TestVectorOrcFile {
Reader reader = OrcFile.createReader(testFilePath,
OrcFile.readerOptions(conf).filesystem(fs));
+ assertEquals("count: 19 hasNull: false", reader.getStatistics()[0].toString());
+ // the size of the column in the different formats
+ int size = (fileFormat == OrcFile.Version.V_0_11 ? 89 :
+ fileFormat == OrcFile.Version.V_0_12 ? 90 : 154);
+ assertEquals("count: 19 hasNull: false bytesOnDisk: " + size +
+ " min: -2 max: 100000000000000 sum: 111111111111109.111",
+ reader.getStatistics()[1].toString());
RecordReader rows = reader.rows();
batch = schema.createRowBatchV2();
cv = (Decimal64ColumnVector) batch.cols[0];
@@ -1557,7 +1603,8 @@ public class TestVectorOrcFile {
Writer writer = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.setSchema(schema)
- .compress(CompressionKind.NONE));
+ .compress(CompressionKind.NONE)
+ .version(fileFormat));
DecimalColumnVector cv = (DecimalColumnVector) batch.cols[0];
cv.precision = 18;
cv.scale = 3;
@@ -1574,6 +1621,14 @@ public class TestVectorOrcFile {
// test with new batch
Reader reader = OrcFile.createReader(testFilePath,
OrcFile.readerOptions(conf).filesystem(fs));
+ assertEquals("count: 19 hasNull: false", reader.getStatistics()[0].toString());
+ // the size of the column in the different formats
+ int size = (fileFormat == OrcFile.Version.V_0_11 ? 63 :
+ fileFormat == OrcFile.Version.V_0_12 ? 65 : 154);
+ assertEquals("count: 19 hasNull: false bytesOnDisk: " + size +
+ " min: -2 max: 10000000000000 sum: 11111111111109.1111",
+ reader.getStatistics()[1].toString());
+
RecordReader rows = reader.rows();
batch = schema.createRowBatchV2();
Decimal64ColumnVector newCv = (Decimal64ColumnVector) batch.cols[0];
@@ -1640,7 +1695,8 @@ public class TestVectorOrcFile {
.stripeSize(1000)
.compress(CompressionKind.NONE)
.bufferSize(100)
- .blockPadding(false));
+ .blockPadding(false)
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 6;
setUnion(batch, 0, Timestamp.valueOf("2000-03-12 15:00:00"), 0, 42, null,
@@ -1863,7 +1919,8 @@ public class TestVectorOrcFile {
.setSchema(schema)
.stripeSize(1000)
.compress(CompressionKind.SNAPPY)
- .bufferSize(100));
+ .bufferSize(100)
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
Random rand;
writeRandomIntBytesBatches(writer, batch, 10, 1000);
@@ -1902,7 +1959,8 @@ public class TestVectorOrcFile {
.setSchema(schema)
.stripeSize(10000)
.compress(CompressionKind.LZO)
- .bufferSize(1000));
+ .bufferSize(1000)
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
Random rand = new Random(69);
batch.size = 1000;
@@ -1951,7 +2009,8 @@ public class TestVectorOrcFile {
.setSchema(schema)
.stripeSize(10000)
.compress(CompressionKind.LZ4)
- .bufferSize(1000));
+ .bufferSize(1000)
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
Random rand = new Random(3);
batch.size = 1000;
@@ -1993,10 +2052,11 @@ public class TestVectorOrcFile {
*/
@Test
public void testCodecPool() throws Exception {
+ OrcCodecPool.clear();
TypeDescription schema = createInnerSchema();
VectorizedRowBatch batch = schema.createRowBatch();
WriterOptions opts = OrcFile.writerOptions(conf)
- .setSchema(schema).stripeSize(1000).bufferSize(100);
+ .setSchema(schema).stripeSize(1000).bufferSize(100).version(fileFormat);
CompressionCodec snappyCodec, zlibCodec;
snappyCodec = writeBatchesAndGetCodec(10, 1000, opts.compress(CompressionKind.SNAPPY), batch);
@@ -2032,12 +2092,17 @@ public class TestVectorOrcFile {
assertTrue(snappyCodec == codec || snappyCodec2 == codec);
}
- private CompressionCodec writeBatchesAndGetCodec(int count, int size, WriterOptions opts,
- VectorizedRowBatch batch) throws IOException {
+ private CompressionCodec writeBatchesAndGetCodec(int count,
+ int size,
+ WriterOptions opts,
+ VectorizedRowBatch batch
+ ) throws IOException {
fs.delete(testFilePath, false);
- Writer writer = OrcFile.createWriter(testFilePath, opts);
+ PhysicalWriter physical = new PhysicalFsWriter(fs, testFilePath, opts);
+ CompressionCodec codec = physical.getCompressionCodec();
+ Writer writer = OrcFile.createWriter(testFilePath,
+ opts.physicalWriter(physical));
writeRandomIntBytesBatches(writer, batch, count, size);
- CompressionCodec codec = ((WriterImpl)writer).getCompressionCodec();
writer.close();
return codec;
}
@@ -2086,7 +2151,8 @@ public class TestVectorOrcFile {
.stripeSize(5000)
.compress(CompressionKind.SNAPPY)
.bufferSize(1000)
- .rowIndexStride(0));
+ .rowIndexStride(0)
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
Random rand = new Random(24);
batch.size = 5;
@@ -2313,6 +2379,8 @@ public class TestVectorOrcFile {
@Test
public void testMemoryManagementV11() throws Exception {
+ Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
+
TypeDescription schema = createInnerSchema();
MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
Writer writer = OrcFile.createWriter(testFilePath,
@@ -2323,7 +2391,7 @@ public class TestVectorOrcFile {
.bufferSize(100)
.rowIndexStride(0)
.memory(memory)
- .version(OrcFile.Version.V_0_11));
+ .version(fileFormat));
assertEquals(testFilePath, memory.path);
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1;
@@ -2349,6 +2417,7 @@ public class TestVectorOrcFile {
@Test
public void testMemoryManagementV12() throws Exception {
+ Assume.assumeTrue(fileFormat != OrcFile.Version.V_0_11);
TypeDescription schema = createInnerSchema();
MyMemoryManager memory = new MyMemoryManager(conf, 10000, 0.1);
Writer writer = OrcFile.createWriter(testFilePath,
@@ -2359,7 +2428,7 @@ public class TestVectorOrcFile {
.bufferSize(100)
.rowIndexStride(0)
.memory(memory)
- .version(OrcFile.Version.V_0_12));
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
assertEquals(testFilePath, memory.path);
batch.size = 1;
@@ -2395,7 +2464,8 @@ public class TestVectorOrcFile {
.stripeSize(400000L)
.compress(CompressionKind.NONE)
.bufferSize(500)
- .rowIndexStride(1000));
+ .rowIndexStride(1000)
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.ensureSize(3500);
batch.size = 3500;
@@ -2520,7 +2590,8 @@ public class TestVectorOrcFile {
Writer writer = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.setSchema(schema)
- .rowIndexStride(1000));
+ .rowIndexStride(1000)
+ .version(fileFormat));
// write 1024 repeating nulls
batch.size = 1024;
@@ -2812,8 +2883,7 @@ public class TestVectorOrcFile {
.addField("char", TypeDescription.createChar().withMaxLength(10))
.addField("varchar", TypeDescription.createVarchar().withMaxLength(10));
Writer writer = OrcFile.createWriter(testFilePath,
- OrcFile.writerOptions(conf)
- .setSchema(schema));
+ OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 4;
for(int c=0; c < batch.cols.length; ++c) {
@@ -2858,12 +2928,14 @@ public class TestVectorOrcFile {
*/
@Test
public void testNonDictionaryRepeatingString() throws Exception {
+ Assume.assumeTrue(fileFormat != OrcFile.Version.V_0_11);
TypeDescription schema = TypeDescription.createStruct()
.addField("str", TypeDescription.createString());
Writer writer = OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf)
.setSchema(schema)
- .rowIndexStride(1000));
+ .rowIndexStride(1000)
+ .version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1024;
for(int r=0; r < batch.size; ++r) {
@@ -2901,7 +2973,7 @@ public class TestVectorOrcFile {
.addField("struct", TypeDescription.createStruct()
.addField("inner", TypeDescription.createLong()));
Writer writer = OrcFile.createWriter(testFilePath,
- OrcFile.writerOptions(conf).setSchema(schema));
+ OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1024;
StructColumnVector outer = (StructColumnVector) batch.cols[0];
@@ -2945,7 +3017,7 @@ public class TestVectorOrcFile {
.addUnionChild(TypeDescription.createInt())
.addUnionChild(TypeDescription.createLong()));
Writer writer = OrcFile.createWriter(testFilePath,
- OrcFile.writerOptions(conf).setSchema(schema));
+ OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1024;
UnionColumnVector outer = (UnionColumnVector) batch.cols[0];
@@ -3020,7 +3092,7 @@ public class TestVectorOrcFile {
.addField("list",
TypeDescription.createList(TypeDescription.createLong()));
Writer writer = OrcFile.createWriter(testFilePath,
- OrcFile.writerOptions(conf).setSchema(schema));
+ OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1024;
ListColumnVector list = (ListColumnVector) batch.cols[0];
@@ -3094,7 +3166,7 @@ public class TestVectorOrcFile {
TypeDescription.createMap(TypeDescription.createLong(),
TypeDescription.createLong()));
Writer writer = OrcFile.createWriter(testFilePath,
- OrcFile.writerOptions(conf).setSchema(schema));
+ OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1024;
MapColumnVector map = (MapColumnVector) batch.cols[0];
@@ -3167,7 +3239,7 @@ public class TestVectorOrcFile {
"struct<list1:array<string>," +
"list2:array<binary>>");
Writer writer = OrcFile.createWriter(testFilePath,
- OrcFile.writerOptions(conf).setSchema(schema));
+ OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 2;
ListColumnVector list1 = (ListColumnVector) batch.cols[0];
@@ -3199,6 +3271,8 @@ public class TestVectorOrcFile {
@Test
public void testWriterVersion() throws Exception {
+ Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
+
// test writer implementation serialization
assertEquals(OrcFile.WriterImplementation.ORC_JAVA,
OrcFile.WriterImplementation.from(0));
@@ -3242,6 +3316,7 @@ public class TestVectorOrcFile {
@Test(expected=IllegalArgumentException.class)
public void testBadPrestoVersion() {
+ Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
OrcFile.WriterVersion.from(OrcFile.WriterImplementation.PRESTO, 0);
}
@@ -3251,6 +3326,7 @@ public class TestVectorOrcFile {
*/
@Test
public void testFileVersion() throws Exception {
+ Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
assertEquals(OrcFile.Version.V_0_11, ReaderImpl.getFileVersion(null));
assertEquals(OrcFile.Version.V_0_11, ReaderImpl.getFileVersion(new ArrayList<Integer>()));
assertEquals(OrcFile.Version.V_0_11,
@@ -3263,6 +3339,7 @@ public class TestVectorOrcFile {
@Test
public void testMergeUnderstood() throws Exception {
+ Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
Path p = new Path("test.orc");
Reader futureVersion = Mockito.mock(Reader.class);
Mockito.when(futureVersion.getFileVersion()).thenReturn(OrcFile.Version.FUTURE);
@@ -3288,11 +3365,14 @@ public class TestVectorOrcFile {
@Test
public void testMerge() throws Exception {
- Path input1 = new Path(workDir, "TestVectorOrcFile.testMerge1.orc");
+ Path input1 = new Path(workDir, "TestVectorOrcFile.testMerge1-" +
+ fileFormat.getName() + ".orc");
fs.delete(input1, false);
- Path input2 = new Path(workDir, "TestVectorOrcFile.testMerge2.orc");
+ Path input2 = new Path(workDir, "TestVectorOrcFile.testMerge2-" +
+ fileFormat.getName() + ".orc");
fs.delete(input2, false);
- Path input3 = new Path(workDir, "TestVectorOrcFile.testMerge3.orc");
+ Path input3 = new Path(workDir, "TestVectorOrcFile.testMerge3-" +
+ fileFormat.getName() + ".orc");
fs.delete(input3, false);
TypeDescription schema = TypeDescription.fromString("struct<a:int,b:string>");
// change all of the options away from default to find anything we
@@ -3303,7 +3383,7 @@ public class TestVectorOrcFile {
.enforceBufferSize()
.bufferSize(20*1024)
.rowIndexStride(1000)
- .version(OrcFile.Version.V_0_11)
+ .version(fileFormat)
.writerVersion(OrcFile.WriterVersion.HIVE_8732);
Writer writer = OrcFile.createWriter(input1, opts);
@@ -3344,7 +3424,8 @@ public class TestVectorOrcFile {
writer.addUserMetadata("d", fromString("bat"));
writer.close();
- Path output1 = new Path(workDir, "TestVectorOrcFile.testMerge.out1.orc");
+ Path output1 = new Path(workDir, "TestVectorOrcFile.testMerge.out1-" +
+ fileFormat.getName() + ".orc");
fs.delete(output1, false);
List<Path> paths = OrcFile.mergeFiles(output1,
OrcFile.writerOptions(conf), Arrays.asList(input1, input2, input3));
@@ -3354,7 +3435,7 @@ public class TestVectorOrcFile {
assertEquals(CompressionKind.LZO, reader.getCompressionKind());
assertEquals(30 * 1024, reader.getCompressionSize());
assertEquals(1000, reader.getRowIndexStride());
- assertEquals(OrcFile.Version.V_0_11, reader.getFileVersion());
+ assertEquals(fileFormat, reader.getFileVersion());
assertEquals(OrcFile.WriterVersion.HIVE_8732, reader.getWriterVersion());
assertEquals(3, reader.getStripes().size());
assertEquals(4, reader.getMetadataKeys().size());
@@ -3364,7 +3445,8 @@ public class TestVectorOrcFile {
assertEquals(fromString("bat"), reader.getMetadataValue("d"));
TypeDescription schema4 = TypeDescription.fromString("struct<a:int>");
- Path input4 = new Path(workDir, "TestVectorOrcFile.testMerge4.orc");
+ Path input4 = new Path(workDir, "TestVectorOrcFile.testMerge4-" +
+ fileFormat.getName() + ".orc");
fs.delete(input4, false);
opts.setSchema(schema4);
writer = OrcFile.createWriter(input4, opts);
@@ -3376,7 +3458,8 @@ public class TestVectorOrcFile {
writer.addRowBatch(batch);
writer.close();
- Path input5 = new Path(workDir, "TestVectorOrcFile.testMerge5.orc");
+ Path input5 = new Path(workDir, "TestVectorOrcFile.testMerge5-" +
+ fileFormat.getName() + ".orc");
fs.delete(input5, false);
opts.setSchema(schema)
.compress(CompressionKind.NONE)
@@ -3391,7 +3474,8 @@ public class TestVectorOrcFile {
writer.addRowBatch(batch);
writer.close();
- Path output2 = new Path(workDir, "TestVectorOrcFile.testMerge.out2.orc");
+ Path output2 = new Path(workDir, "TestVectorOrcFile.testMerge.out2-" +
+ fileFormat.getName() + ".orc");
fs.delete(output2, false);
paths = OrcFile.mergeFiles(output2, OrcFile.writerOptions(conf),
Arrays.asList(input3, input4, input1, input5));
@@ -3401,7 +3485,7 @@ public class TestVectorOrcFile {
assertEquals(CompressionKind.LZO, reader.getCompressionKind());
assertEquals(20 * 1024, reader.getCompressionSize());
assertEquals(1000, reader.getRowIndexStride());
- assertEquals(OrcFile.Version.V_0_11, reader.getFileVersion());
+ assertEquals(fileFormat, reader.getFileVersion());
assertEquals(OrcFile.WriterVersion.HIVE_8732, reader.getWriterVersion());
assertEquals(2, reader.getStripes().size());
assertEquals(4, reader.getMetadataKeys().size());
@@ -3416,6 +3500,7 @@ public class TestVectorOrcFile {
@Test
public void testZeroByteOrcFile() throws Exception {
+ Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
Path zeroFile = new Path(exampleDir, "zero.orc");
Reader reader = OrcFile.createReader(zeroFile, OrcFile.readerOptions(conf));
assertEquals(0, reader.getNumberOfRows());
@@ -3437,6 +3522,7 @@ public class TestVectorOrcFile {
@Test
public void testFutureOrcFile() throws Exception {
+ Assume.assumeTrue(fileFormat == OrcFile.Version.V_0_11);
Path zeroFile = new Path(exampleDir, "version1999.orc");
try {
Reader reader = OrcFile.createReader(zeroFile, OrcFile.readerOptions(conf));
@@ -3456,7 +3542,7 @@ public class TestVectorOrcFile {
TypeDescription.fromString("struct<list1:array<double>," +
"list2:array<float>>");
Writer writer = OrcFile.createWriter(testFilePath,
- OrcFile.writerOptions(conf).setSchema(schema));
+ OrcFile.writerOptions(conf).setSchema(schema).version(fileFormat));
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 2;
ListColumnVector list1 = (ListColumnVector) batch.cols[0];
http://git-wip-us.apache.org/repos/asf/orc/blob/45b9c508/site/specification/ORCv2.md
----------------------------------------------------------------------
diff --git a/site/specification/ORCv2.md b/site/specification/ORCv2.md
index b71b69b..74fc974 100644
--- a/site/specification/ORCv2.md
+++ b/site/specification/ORCv2.md
@@ -10,7 +10,7 @@ developers on the project.
The list of things that we plan to change:
-* Create a decimal representation with fixed scale using rle.
+* Move decimal encoding to RLEv3 and remove variable length encoding.
* Create a better float/double encoding that splits mantissa and
exponent.
* Create a dictionary encoding for float, double, and decimal.
@@ -821,23 +821,20 @@ DIRECT_V2 | PRESENT | Yes | Boolean RLE
## Decimal Columns
-Decimal was introduced in Hive 0.11 with infinite precision (the total
-number of digits). In Hive 0.13, the definition was change to limit
-the precision to a maximum of 38 digits, which conveniently uses 127
-bits plus a sign bit. The current encoding of decimal columns stores
-the integer representation of the value as an unbounded length zigzag
-encoded base 128 varint. The scale is stored in the SECONDARY stream
-as an signed integer.
+Since Hive 0.13, all decimals have had fixed precision and scale.
+The goal is to use RLEv3 for the value and use the fixed scale from
+the type. As an interim solution, we are using RLE v2 for short decimals
+(precision <= 18) and the old encoding for long decimals.
Encoding | Stream Kind | Optional | Contents
:------------ | :-------------- | :------- | :-------
DIRECT | PRESENT | Yes | Boolean RLE
- | DATA | No | Unbounded base 128 varints
- | SECONDARY | No | Unsigned Integer RLE v1
+ | DATA | No | Signed Integer RLE v2
DIRECT_V2 | PRESENT | Yes | Boolean RLE
| DATA | No | Unbounded base 128 varints
| SECONDARY | No | Unsigned Integer RLE v2
+
## Date Columns
Date data is encoded with a PRESENT stream, a DATA stream that records