You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/12/14 22:36:14 UTC
[1/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.
Repository: hive
Updated Branches:
refs/heads/master 49dc6452a -> 06e39ebe0
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java b/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
deleted file mode 100644
index 151f30d..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.util;
-
-/**
- * Estimation of memory footprint of object
- */
-public enum JavaDataModel {
-
- JAVA32 {
- @Override
- public int object() {
- return JAVA32_OBJECT;
- }
-
- @Override
- public int array() {
- return JAVA32_ARRAY;
- }
-
- @Override
- public int ref() {
- return JAVA32_REF;
- }
-
- @Override
- public int hashMap(int entry) {
- // base = JAVA32_OBJECT + PRIMITIVES1 * 4 + JAVA32_FIELDREF * 3 + JAVA32_ARRAY;
- // entry = JAVA32_OBJECT + JAVA32_FIELDREF + PRIMITIVES1
- return hashMapBase() + hashMapEntry() * entry;
- }
-
- @Override
- public int hashMapBase() {
- return 64;
- }
-
- @Override
- public int hashMapEntry() {
- return 24;
- }
-
- @Override
- public int hashSet(int entry) {
- // hashMap += JAVA32_OBJECT
- return hashSetBase() + hashSetEntry() * entry;
- }
-
- @Override
- public int hashSetBase() {
- return 80;
- }
-
- @Override
- public int hashSetEntry() {
- return 24;
- }
-
- @Override
- public int linkedHashMap(int entry) {
- // hashMap += JAVA32_FIELDREF + PRIMITIVES1
- // hashMap.entry += JAVA32_FIELDREF * 2
- return 72 + 32 * entry;
- }
-
- @Override
- public int linkedList(int entry) {
- // base = JAVA32_OBJECT + PRIMITIVES1 * 2 + JAVA32_FIELDREF;
- // entry = JAVA32_OBJECT + JAVA32_FIELDREF * 2
- return linkedListBase() + linkedListEntry() * entry;
- }
-
- @Override
- public int linkedListBase() {
- return 28;
- }
-
- @Override
- public int linkedListEntry() {
- return 24;
- }
-
- @Override
- public int arrayList() {
- // JAVA32_OBJECT + PRIMITIVES1 * 2 + JAVA32_ARRAY;
- return 44;
- }
-
- @Override
- public int memoryAlign() {
- return 8;
- }
- }, JAVA64 {
- @Override
- public int object() {
- return JAVA64_OBJECT;
- }
-
- @Override
- public int array() {
- return JAVA64_ARRAY;
- }
-
- @Override
- public int ref() {
- return JAVA64_REF;
- }
-
- @Override
- public int hashMap(int entry) {
- // base = JAVA64_OBJECT + PRIMITIVES1 * 4 + JAVA64_FIELDREF * 3 + JAVA64_ARRAY;
- // entry = JAVA64_OBJECT + JAVA64_FIELDREF + PRIMITIVES1
- return hashMapBase() + hashMapEntry() * entry;
- }
-
- @Override
- public int hashMapBase() {
- return 112;
- }
-
-
- @Override
- public int hashMapEntry() {
- return 44;
- }
-
- @Override
- public int hashSet(int entry) {
- // hashMap += JAVA64_OBJECT
- return hashSetBase() + hashSetEntry() * entry;
- }
-
- @Override
- public int hashSetBase() {
- return 144;
- }
-
- @Override
- public int hashSetEntry() {
- return 44;
- }
-
- @Override
- public int linkedHashMap(int entry) {
- // hashMap += JAVA64_FIELDREF + PRIMITIVES1
- // hashMap.entry += JAVA64_FIELDREF * 2
- return 128 + 60 * entry;
- }
-
- @Override
- public int linkedList(int entry) {
- // base = JAVA64_OBJECT + PRIMITIVES1 * 2 + JAVA64_FIELDREF;
- // entry = JAVA64_OBJECT + JAVA64_FIELDREF * 2
- return linkedListBase() + linkedListEntry() * entry;
- }
-
- @Override
- public int linkedListBase() {
- return 48;
- }
-
- @Override
- public int linkedListEntry() {
- return 48;
- }
-
- @Override
- public int arrayList() {
- // JAVA64_OBJECT + PRIMITIVES1 * 2 + JAVA64_ARRAY;
- return 80;
- }
-
- @Override
- public int memoryAlign() {
- return 8;
- }
- };
-
- public abstract int object();
- public abstract int array();
- public abstract int ref();
- public abstract int hashMap(int entry);
- public abstract int hashMapBase();
- public abstract int hashMapEntry();
- public abstract int hashSetBase();
- public abstract int hashSetEntry();
- public abstract int hashSet(int entry);
- public abstract int linkedHashMap(int entry);
- public abstract int linkedListBase();
- public abstract int linkedListEntry();
- public abstract int linkedList(int entry);
- public abstract int arrayList();
- public abstract int memoryAlign();
-
- // ascii string
- public int lengthFor(String string) {
- return lengthForStringOfLength(string.length());
- }
-
- public int lengthForRandom() {
- // boolean + double + AtomicLong
- return object() + primitive1() + primitive2() + object() + primitive2();
- }
-
- public int primitive1() {
- return PRIMITIVES1;
- }
- public int primitive2() {
- return PRIMITIVES2;
- }
-
- public static int alignUp(int value, int align) {
- return (value + align - 1) & ~(align - 1);
- }
-
- public static final int JAVA32_META = 12;
- public static final int JAVA32_ARRAY_META = 16;
- public static final int JAVA32_REF = 4;
- public static final int JAVA32_OBJECT = 16; // JAVA32_META + JAVA32_REF
- public static final int JAVA32_ARRAY = 20; // JAVA32_ARRAY_META + JAVA32_REF
-
- public static final int JAVA64_META = 24;
- public static final int JAVA64_ARRAY_META = 32;
- public static final int JAVA64_REF = 8;
- public static final int JAVA64_OBJECT = 32; // JAVA64_META + JAVA64_REF
- public static final int JAVA64_ARRAY = 40; // JAVA64_ARRAY_META + JAVA64_REF
-
- public static final int PRIMITIVES1 = 4; // void, boolean, byte, short, int, float
- public static final int PRIMITIVES2 = 8; // long, double
-
- public static final int PRIMITIVE_BYTE = 1; // byte
-
- private static JavaDataModel current;
-
- public static JavaDataModel get() {
- if (current != null) {
- return current;
- }
- try {
- String props = System.getProperty("sun.arch.data.model");
- if ("32".equals(props)) {
- return current = JAVA32;
- }
- } catch (Exception e) {
- // ignore
- }
- // TODO: separate model is needed for compressedOops, which can be guessed from memory size.
- return current = JAVA64;
- }
-
- public static int round(int size) {
- JavaDataModel model = get();
- if (model == JAVA32 || size % 8 == 0) {
- return size;
- }
- return ((size + 8) >> 3) << 3;
- }
-
- private int lengthForPrimitiveArrayOfSize(int primitiveSize, int length) {
- return alignUp(array() + primitiveSize*length, memoryAlign());
- }
-
- public int lengthForByteArrayOfSize(int length) {
- return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length);
- }
- public int lengthForObjectArrayOfSize(int length) {
- return lengthForPrimitiveArrayOfSize(ref(), length);
- }
- public int lengthForLongArrayOfSize(int length) {
- return lengthForPrimitiveArrayOfSize(primitive2(), length);
- }
- public int lengthForDoubleArrayOfSize(int length) {
- return lengthForPrimitiveArrayOfSize(primitive2(), length);
- }
- public int lengthForIntArrayOfSize(int length) {
- return lengthForPrimitiveArrayOfSize(primitive1(), length);
- }
- public int lengthForBooleanArrayOfSize(int length) {
- return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length);
- }
- public int lengthForTimestampArrayOfSize(int length) {
- return lengthForPrimitiveArrayOfSize(lengthOfTimestamp(), length);
- }
- public int lengthForDateArrayOfSize(int length) {
- return lengthForPrimitiveArrayOfSize(lengthOfDate(), length);
- }
- public int lengthForDecimalArrayOfSize(int length) {
- return lengthForPrimitiveArrayOfSize(lengthOfDecimal(), length);
- }
-
- public int lengthOfDecimal() {
- // object overhead + 8 bytes for intCompact + 4 bytes for precision
- // + 4 bytes for scale + size of BigInteger
- return object() + 2 * primitive2() + lengthOfBigInteger();
- }
-
- private int lengthOfBigInteger() {
- // object overhead + 4 bytes for bitCount + 4 bytes for bitLength
- // + 4 bytes for firstNonzeroByteNum + 4 bytes for firstNonzeroIntNum +
- // + 4 bytes for lowestSetBit + 5 bytes for size of magnitude (since max precision
- // is only 38 for HiveDecimal) + 7 bytes of padding (since java memory allocations
- // are 8 byte aligned)
- return object() + 4 * primitive2();
- }
-
- public int lengthOfTimestamp() {
- // object overhead + 4 bytes for int (nanos) + 4 bytes of padding
- return object() + primitive2();
- }
-
- public int lengthOfDate() {
- // object overhead + 8 bytes for long (fastTime) + 16 bytes for cdate
- return object() + 3 * primitive2();
- }
-
- public int lengthForStringOfLength(int strLen) {
- return object() + primitive1() * 3 + array() + strLen;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
index 40674ea..554033c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFileDump.java
@@ -153,8 +153,14 @@ public class TestFileDump {
(MyRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
conf.set(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname, "COMPRESSION");
- Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
- 100000, CompressionKind.ZLIB, 10000, 1000);
+ Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf)
+ .fileSystem(fs)
+ .inspector(inspector)
+ .batchSize(1000)
+ .compress(CompressionKind.ZLIB)
+ .stripeSize(100000)
+ .rowIndexStride(1000));
Random r1 = new Random(1);
String[] words = new String[]{"It", "was", "the", "best", "of", "times,",
"it", "was", "the", "worst", "of", "times,", "it", "was", "the", "age",
@@ -263,8 +269,15 @@ public class TestFileDump {
Configuration conf = new Configuration();
conf.set(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname, "COMPRESSION");
conf.setFloat(HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname, 0.49f);
- Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
- 100000, CompressionKind.ZLIB, 10000, 1000);
+ Writer writer = OrcFile.createWriter(testFilePath,
+ OrcFile.writerOptions(conf)
+ .fileSystem(fs)
+ .batchSize(1000)
+ .inspector(inspector)
+ .stripeSize(100000)
+ .compress(CompressionKind.ZLIB)
+ .rowIndexStride(1000)
+ .bufferSize(10000));
Random r1 = new Random(1);
String[] words = new String[]{"It", "was", "the", "best", "of", "times,",
"it", "was", "the", "worst", "of", "times,", "it", "was", "the", "age",
@@ -319,6 +332,7 @@ public class TestFileDump {
.compress(CompressionKind.ZLIB)
.bufferSize(10000)
.rowIndexStride(1000)
+ .batchSize(1000)
.bloomFilterColumns("S");
Writer writer = OrcFile.createWriter(testFilePath, options);
Random r1 = new Random(1);
@@ -368,7 +382,8 @@ public class TestFileDump {
.bufferSize(10000)
.rowIndexStride(1000)
.bloomFilterColumns("l")
- .bloomFilterFpp(0.01);
+ .bloomFilterFpp(0.01)
+ .batchSize(1000);
Writer writer = OrcFile.createWriter(testFilePath, options);
Random r1 = new Random(1);
String[] words = new String[]{"It", "was", "the", "best", "of", "times,",
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
index 2fd13c7..f41a7ba 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java
@@ -1215,7 +1215,7 @@ public class TestNewIntegerEncoding {
.encodingStrategy(encodingStrategy));
List<Timestamp> tslist = Lists.newArrayList();
- tslist.add(Timestamp.valueOf("9999-01-01 00:00:00"));
+ tslist.add(Timestamp.valueOf("2099-01-01 00:00:00"));
tslist.add(Timestamp.valueOf("2003-01-01 00:00:00"));
tslist.add(Timestamp.valueOf("1999-01-01 00:00:00"));
tslist.add(Timestamp.valueOf("1995-01-01 00:00:00"));
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
index ebe3096..a7e657c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
@@ -655,7 +655,8 @@ public class TestOrcFile {
OrcFile.writerOptions(conf)
.inspector(inspector)
.stripeSize(100000)
- .bufferSize(10000));
+ .bufferSize(10000)
+ .batchSize(1000));
for (int i = 0; i < 11000; i++) {
if (i >= 5000) {
if (i >= 10000) {
@@ -1260,6 +1261,7 @@ public class TestOrcFile {
.inspector(inspector)
.stripeSize(1000)
.compress(CompressionKind.NONE)
+ .batchSize(1000)
.bufferSize(100)
.blockPadding(false));
OrcStruct row = new OrcStruct(3);
@@ -1835,8 +1837,9 @@ public class TestOrcFile {
@Override
public void addedRow(int count) throws IOException {
rows += count;
- if (rows % 100 == 0) {
+ if (rows >= 100) {
callback.checkMemory(rate);
+ rows = 0;
}
}
}
@@ -1858,6 +1861,7 @@ public class TestOrcFile {
.bufferSize(100)
.rowIndexStride(0)
.memory(memory)
+ .batchSize(100)
.version(OrcFile.Version.V_0_11));
assertEquals(testFilePath, memory.path);
for(int i=0; i < 2500; ++i) {
@@ -1894,6 +1898,7 @@ public class TestOrcFile {
.bufferSize(100)
.rowIndexStride(0)
.memory(memory)
+ .batchSize(100)
.version(OrcFile.Version.V_0_12));
assertEquals(testFilePath, memory.path);
for(int i=0; i < 2500; ++i) {
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 966621c..ab1d2aa 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -867,7 +867,7 @@ public class TestOrcRawRecordMerger {
Writer writer = OrcFile.createWriter(new Path(root, "0000010_0"),
OrcFile.writerOptions(conf).inspector(inspector).fileSystem(fs)
.blockPadding(false).bufferSize(10000).compress(CompressionKind.NONE)
- .stripeSize(1).memory(mgr).version(OrcFile.Version.V_0_11));
+ .stripeSize(1).memory(mgr).batchSize(2).version(OrcFile.Version.V_0_11));
String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
"2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"};
for(int i=0; i < values.length; ++i) {
@@ -878,7 +878,8 @@ public class TestOrcRawRecordMerger {
// write a delta
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
.writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
- .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root);
+ .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5)
+ .finalDestination(root);
RecordUpdater ru = of.getRecordUpdater(root, options);
values = new String[]{"0.0", null, null, "1.1", null, null, null,
"ignore.7"};
@@ -972,7 +973,7 @@ public class TestOrcRawRecordMerger {
.bucket(BUCKET).inspector(inspector).filesystem(fs);
options.orcOptions(OrcFile.writerOptions(conf)
.stripeSize(1).blockPadding(false).compress(CompressionKind.NONE)
- .memory(mgr));
+ .memory(mgr).batchSize(2));
options.finalDestination(root);
RecordUpdater ru = of.getRecordUpdater(root, options);
String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3",
@@ -983,7 +984,8 @@ public class TestOrcRawRecordMerger {
ru.close(false);
// write a delta
- options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1).recordIdColumn(5);
+ options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
+ .recordIdColumn(5);
ru = of.getRecordUpdater(root, options);
values = new String[]{"0.0", null, null, "1.1", null, null, null,
"ignore.7"};
@@ -1020,7 +1022,7 @@ public class TestOrcRawRecordMerger {
// loop through the 5 splits and read each
for(int i=0; i < 4; ++i) {
- System.out.println("starting split " + i);
+ System.out.println("starting split " + i + " = " + splits[i]);
rr = inf.getRecordReader(splits[i], job, Reporter.NULL);
NullWritable key = rr.createKey();
OrcStruct value = rr.createValue();
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
index a409be8..6803abd 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
+import org.apache.orc.BloomFilterIO;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.Location;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-dump-bloomfilter.out
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-dump-bloomfilter.out b/ql/src/test/resources/orc-file-dump-bloomfilter.out
index 7c3db78..1654e33 100644
--- a/ql/src/test/resources/orc-file-dump-bloomfilter.out
+++ b/ql/src/test/resources/orc-file-dump-bloomfilter.out
@@ -1,5 +1,5 @@
Structure for TestFileDump.testDump.orc
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 21000
Compression: ZLIB
Compression size: 4096
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-dump-bloomfilter2.out
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-dump-bloomfilter2.out b/ql/src/test/resources/orc-file-dump-bloomfilter2.out
index a4f006b..1f6e046 100644
--- a/ql/src/test/resources/orc-file-dump-bloomfilter2.out
+++ b/ql/src/test/resources/orc-file-dump-bloomfilter2.out
@@ -1,5 +1,5 @@
Structure for TestFileDump.testDump.orc
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 21000
Compression: ZLIB
Compression size: 4096
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-dump-dictionary-threshold.out
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out
index 8ad856d..64cf0e9 100644
--- a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out
+++ b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out
@@ -1,5 +1,5 @@
Structure for TestFileDump.testDump.orc
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 21000
Compression: ZLIB
Compression size: 4096
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-dump.json
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-dump.json b/ql/src/test/resources/orc-file-dump.json
index 25fd63b..0376d8a 100644
--- a/ql/src/test/resources/orc-file-dump.json
+++ b/ql/src/test/resources/orc-file-dump.json
@@ -1,7 +1,7 @@
{
"fileName": "TestFileDump.testDump.orc",
"fileVersion": "0.12",
- "writerVersion": "HIVE_4243",
+ "writerVersion": "HIVE_12055",
"numberOfRows": 21000,
"compression": "ZLIB",
"compressionBufferSize": 4096,
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-dump.out
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-dump.out b/ql/src/test/resources/orc-file-dump.out
index 5aaa0f3..57356d3 100644
--- a/ql/src/test/resources/orc-file-dump.out
+++ b/ql/src/test/resources/orc-file-dump.out
@@ -1,5 +1,5 @@
Structure for TestFileDump.testDump.orc
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 21000
Compression: ZLIB
Compression size: 4096
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/resources/orc-file-has-null.out
----------------------------------------------------------------------
diff --git a/ql/src/test/resources/orc-file-has-null.out b/ql/src/test/resources/orc-file-has-null.out
index 438c27c..0e915c6 100644
--- a/ql/src/test/resources/orc-file-has-null.out
+++ b/ql/src/test/resources/orc-file-has-null.out
@@ -1,5 +1,5 @@
Structure for TestOrcFile.testHasNull.orc
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 20000
Compression: ZLIB
Compression size: 4096
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/results/clientpositive/orc_file_dump.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/orc_file_dump.q.out b/ql/src/test/results/clientpositive/orc_file_dump.q.out
index 43c38a8..4c73bac 100644
--- a/ql/src/test/results/clientpositive/orc_file_dump.q.out
+++ b/ql/src/test/results/clientpositive/orc_file_dump.q.out
@@ -93,7 +93,7 @@ PREHOOK: Input: default@orc_ppd
#### A masked pattern was here ####
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 1049
Compression: ZLIB
Compression size: 262144
@@ -213,7 +213,7 @@ PREHOOK: Input: default@orc_ppd
#### A masked pattern was here ####
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 1049
Compression: ZLIB
Compression size: 262144
@@ -345,7 +345,7 @@ PREHOOK: Input: default@orc_ppd_part@ds=2015/hr=10
#### A masked pattern was here ####
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 1049
Compression: ZLIB
Compression size: 262144
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/results/clientpositive/orc_merge10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/orc_merge10.q.out b/ql/src/test/results/clientpositive/orc_merge10.q.out
index a415776..776ca9a 100644
--- a/ql/src/test/results/clientpositive/orc_merge10.q.out
+++ b/ql/src/test/results/clientpositive/orc_merge10.q.out
@@ -517,7 +517,7 @@ PREHOOK: Input: default@orcfile_merge1@ds=1/part=0
#### A masked pattern was here ####
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 242
Compression: SNAPPY
Compression size: 4096
@@ -579,7 +579,7 @@ PREHOOK: Input: default@orcfile_merge1c@ds=1/part=0
#### A masked pattern was here ####
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 242
Compression: SNAPPY
Compression size: 4096
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/results/clientpositive/orc_merge11.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/orc_merge11.q.out b/ql/src/test/results/clientpositive/orc_merge11.q.out
index a7e3d47..65e3d8b 100644
--- a/ql/src/test/results/clientpositive/orc_merge11.q.out
+++ b/ql/src/test/results/clientpositive/orc_merge11.q.out
@@ -72,7 +72,7 @@ PREHOOK: Input: default@orcfile_merge1
#### A masked pattern was here ####
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 50000
Compression: ZLIB
Compression size: 4096
@@ -133,7 +133,7 @@ ________________________________________________________________________________
-- END ORC FILE DUMP --
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 50000
Compression: ZLIB
Compression size: 4096
@@ -217,7 +217,7 @@ PREHOOK: Input: default@orcfile_merge1
#### A masked pattern was here ####
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 100000
Compression: ZLIB
Compression size: 4096
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/results/clientpositive/tez/orc_merge10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/orc_merge10.q.out b/ql/src/test/results/clientpositive/tez/orc_merge10.q.out
index d41671a..8b6a595 100644
--- a/ql/src/test/results/clientpositive/tez/orc_merge10.q.out
+++ b/ql/src/test/results/clientpositive/tez/orc_merge10.q.out
@@ -552,7 +552,7 @@ PREHOOK: Input: default@orcfile_merge1@ds=1/part=0
#### A masked pattern was here ####
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 242
Compression: SNAPPY
Compression size: 4096
@@ -629,7 +629,7 @@ PREHOOK: Input: default@orcfile_merge1c@ds=1/part=0
#### A masked pattern was here ####
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 242
Compression: SNAPPY
Compression size: 4096
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/test/results/clientpositive/tez/orc_merge11.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/orc_merge11.q.out b/ql/src/test/results/clientpositive/tez/orc_merge11.q.out
index a7e3d47..65e3d8b 100644
--- a/ql/src/test/results/clientpositive/tez/orc_merge11.q.out
+++ b/ql/src/test/results/clientpositive/tez/orc_merge11.q.out
@@ -72,7 +72,7 @@ PREHOOK: Input: default@orcfile_merge1
#### A masked pattern was here ####
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 50000
Compression: ZLIB
Compression size: 4096
@@ -133,7 +133,7 @@ ________________________________________________________________________________
-- END ORC FILE DUMP --
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 50000
Compression: ZLIB
Compression size: 4096
@@ -217,7 +217,7 @@ PREHOOK: Input: default@orcfile_merge1
#### A masked pattern was here ####
-- BEGIN ORC FILE DUMP --
#### A masked pattern was here ####
-File Version: 0.12 with HIVE_4243
+File Version: 0.12 with HIVE_12055
Rows: 100000
Compression: ZLIB
Compression size: 4096
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java b/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
new file mode 100644
index 0000000..151f30d
--- /dev/null
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.util;
+
+/**
+ * Estimation of memory footprint of object
+ */
+public enum JavaDataModel {
+
+ JAVA32 {
+ @Override
+ public int object() {
+ return JAVA32_OBJECT;
+ }
+
+ @Override
+ public int array() {
+ return JAVA32_ARRAY;
+ }
+
+ @Override
+ public int ref() {
+ return JAVA32_REF;
+ }
+
+ @Override
+ public int hashMap(int entry) {
+ // base = JAVA32_OBJECT + PRIMITIVES1 * 4 + JAVA32_FIELDREF * 3 + JAVA32_ARRAY;
+ // entry = JAVA32_OBJECT + JAVA32_FIELDREF + PRIMITIVES1
+ return hashMapBase() + hashMapEntry() * entry;
+ }
+
+ @Override
+ public int hashMapBase() {
+ return 64;
+ }
+
+ @Override
+ public int hashMapEntry() {
+ return 24;
+ }
+
+ @Override
+ public int hashSet(int entry) {
+ // hashMap += JAVA32_OBJECT
+ return hashSetBase() + hashSetEntry() * entry;
+ }
+
+ @Override
+ public int hashSetBase() {
+ return 80;
+ }
+
+ @Override
+ public int hashSetEntry() {
+ return 24;
+ }
+
+ @Override
+ public int linkedHashMap(int entry) {
+ // hashMap += JAVA32_FIELDREF + PRIMITIVES1
+ // hashMap.entry += JAVA32_FIELDREF * 2
+ return 72 + 32 * entry;
+ }
+
+ @Override
+ public int linkedList(int entry) {
+ // base = JAVA32_OBJECT + PRIMITIVES1 * 2 + JAVA32_FIELDREF;
+ // entry = JAVA32_OBJECT + JAVA32_FIELDREF * 2
+ return linkedListBase() + linkedListEntry() * entry;
+ }
+
+ @Override
+ public int linkedListBase() {
+ return 28;
+ }
+
+ @Override
+ public int linkedListEntry() {
+ return 24;
+ }
+
+ @Override
+ public int arrayList() {
+ // JAVA32_OBJECT + PRIMITIVES1 * 2 + JAVA32_ARRAY;
+ return 44;
+ }
+
+ @Override
+ public int memoryAlign() {
+ return 8;
+ }
+ }, JAVA64 {
+ @Override
+ public int object() {
+ return JAVA64_OBJECT;
+ }
+
+ @Override
+ public int array() {
+ return JAVA64_ARRAY;
+ }
+
+ @Override
+ public int ref() {
+ return JAVA64_REF;
+ }
+
+ @Override
+ public int hashMap(int entry) {
+ // base = JAVA64_OBJECT + PRIMITIVES1 * 4 + JAVA64_FIELDREF * 3 + JAVA64_ARRAY;
+ // entry = JAVA64_OBJECT + JAVA64_FIELDREF + PRIMITIVES1
+ return hashMapBase() + hashMapEntry() * entry;
+ }
+
+ @Override
+ public int hashMapBase() {
+ return 112;
+ }
+
+
+ @Override
+ public int hashMapEntry() {
+ return 44;
+ }
+
+ @Override
+ public int hashSet(int entry) {
+ // hashMap += JAVA64_OBJECT
+ return hashSetBase() + hashSetEntry() * entry;
+ }
+
+ @Override
+ public int hashSetBase() {
+ return 144;
+ }
+
+ @Override
+ public int hashSetEntry() {
+ return 44;
+ }
+
+ @Override
+ public int linkedHashMap(int entry) {
+ // hashMap += JAVA64_FIELDREF + PRIMITIVES1
+ // hashMap.entry += JAVA64_FIELDREF * 2
+ return 128 + 60 * entry;
+ }
+
+ @Override
+ public int linkedList(int entry) {
+ // base = JAVA64_OBJECT + PRIMITIVES1 * 2 + JAVA64_FIELDREF;
+ // entry = JAVA64_OBJECT + JAVA64_FIELDREF * 2
+ return linkedListBase() + linkedListEntry() * entry;
+ }
+
+ @Override
+ public int linkedListBase() {
+ return 48;
+ }
+
+ @Override
+ public int linkedListEntry() {
+ return 48;
+ }
+
+ @Override
+ public int arrayList() {
+ // JAVA64_OBJECT + PRIMITIVES1 * 2 + JAVA64_ARRAY;
+ return 80;
+ }
+
+ @Override
+ public int memoryAlign() {
+ return 8;
+ }
+ };
+
+ public abstract int object();
+ public abstract int array();
+ public abstract int ref();
+ public abstract int hashMap(int entry);
+ public abstract int hashMapBase();
+ public abstract int hashMapEntry();
+ public abstract int hashSetBase();
+ public abstract int hashSetEntry();
+ public abstract int hashSet(int entry);
+ public abstract int linkedHashMap(int entry);
+ public abstract int linkedListBase();
+ public abstract int linkedListEntry();
+ public abstract int linkedList(int entry);
+ public abstract int arrayList();
+ public abstract int memoryAlign();
+
+ // ascii string
+ public int lengthFor(String string) {
+ return lengthForStringOfLength(string.length());
+ }
+
+ public int lengthForRandom() {
+ // boolean + double + AtomicLong
+ return object() + primitive1() + primitive2() + object() + primitive2();
+ }
+
+ public int primitive1() {
+ return PRIMITIVES1;
+ }
+ public int primitive2() {
+ return PRIMITIVES2;
+ }
+
+ public static int alignUp(int value, int align) {
+ return (value + align - 1) & ~(align - 1);
+ }
+
+ public static final int JAVA32_META = 12;
+ public static final int JAVA32_ARRAY_META = 16;
+ public static final int JAVA32_REF = 4;
+ public static final int JAVA32_OBJECT = 16; // JAVA32_META + JAVA32_REF
+ public static final int JAVA32_ARRAY = 20; // JAVA32_ARRAY_META + JAVA32_REF
+
+ public static final int JAVA64_META = 24;
+ public static final int JAVA64_ARRAY_META = 32;
+ public static final int JAVA64_REF = 8;
+ public static final int JAVA64_OBJECT = 32; // JAVA64_META + JAVA64_REF
+ public static final int JAVA64_ARRAY = 40; // JAVA64_ARRAY_META + JAVA64_REF
+
+ public static final int PRIMITIVES1 = 4; // void, boolean, byte, short, int, float
+ public static final int PRIMITIVES2 = 8; // long, double
+
+ public static final int PRIMITIVE_BYTE = 1; // byte
+
+ private static JavaDataModel current;
+
+ public static JavaDataModel get() {
+ if (current != null) {
+ return current;
+ }
+ try {
+ String props = System.getProperty("sun.arch.data.model");
+ if ("32".equals(props)) {
+ return current = JAVA32;
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ // TODO: separate model is needed for compressedOops, which can be guessed from memory size.
+ return current = JAVA64;
+ }
+
+ public static int round(int size) {
+ JavaDataModel model = get();
+ if (model == JAVA32 || size % 8 == 0) {
+ return size;
+ }
+ return ((size + 8) >> 3) << 3;
+ }
+
+ private int lengthForPrimitiveArrayOfSize(int primitiveSize, int length) {
+ return alignUp(array() + primitiveSize*length, memoryAlign());
+ }
+
+ public int lengthForByteArrayOfSize(int length) {
+ return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length);
+ }
+ public int lengthForObjectArrayOfSize(int length) {
+ return lengthForPrimitiveArrayOfSize(ref(), length);
+ }
+ public int lengthForLongArrayOfSize(int length) {
+ return lengthForPrimitiveArrayOfSize(primitive2(), length);
+ }
+ public int lengthForDoubleArrayOfSize(int length) {
+ return lengthForPrimitiveArrayOfSize(primitive2(), length);
+ }
+ public int lengthForIntArrayOfSize(int length) {
+ return lengthForPrimitiveArrayOfSize(primitive1(), length);
+ }
+ public int lengthForBooleanArrayOfSize(int length) {
+ return lengthForPrimitiveArrayOfSize(PRIMITIVE_BYTE, length);
+ }
+ public int lengthForTimestampArrayOfSize(int length) {
+ return lengthForPrimitiveArrayOfSize(lengthOfTimestamp(), length);
+ }
+ public int lengthForDateArrayOfSize(int length) {
+ return lengthForPrimitiveArrayOfSize(lengthOfDate(), length);
+ }
+ public int lengthForDecimalArrayOfSize(int length) {
+ return lengthForPrimitiveArrayOfSize(lengthOfDecimal(), length);
+ }
+
+ public int lengthOfDecimal() {
+ // object overhead + 8 bytes for intCompact + 4 bytes for precision
+ // + 4 bytes for scale + size of BigInteger
+ return object() + 2 * primitive2() + lengthOfBigInteger();
+ }
+
+ private int lengthOfBigInteger() {
+ // object overhead + 4 bytes for bitCount + 4 bytes for bitLength
+ // + 4 bytes for firstNonzeroByteNum + 4 bytes for firstNonzeroIntNum +
+ // + 4 bytes for lowestSetBit + 5 bytes for size of magnitude (since max precision
+ // is only 38 for HiveDecimal) + 7 bytes of padding (since java memory allocations
+ // are 8 byte aligned)
+ return object() + 4 * primitive2();
+ }
+
+ public int lengthOfTimestamp() {
+ // object overhead + 4 bytes for int (nanos) + 4 bytes of padding
+ return object() + primitive2();
+ }
+
+ public int lengthOfDate() {
+ // object overhead + 8 bytes for long (fastTime) + 16 bytes for cdate
+ return object() + 3 * primitive2();
+ }
+
+ public int lengthForStringOfLength(int strLen) {
+ return object() + primitive1() * 3 + array() + strLen;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java b/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
new file mode 100644
index 0000000..bb0b8f2
--- /dev/null
+++ b/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.common.util;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * BloomFilter is a probabilistic data structure for set membership check. BloomFilters are
+ * highly space efficient when compared to using a HashSet. Because of the probabilistic nature of
+ * bloom filter false positive (element not present in bloom filter but test() says true) are
+ * possible but false negatives are not possible (if element is present then test() will never
+ * say false). The false positive probability is configurable (default: 5%) depending on which
+ * storage requirement may increase or decrease. Lower the false positive probability greater
+ * is the space requirement.
+ * Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
+ * During the creation of bloom filter expected number of entries must be specified. If the number
+ * of insertions exceed the specified initial number of entries then false positive probability will
+ * increase accordingly.
+ *
+ * Internally, this implementation of bloom filter uses Murmur3 fast non-cryptographic hash
+ * algorithm. Although Murmur2 is slightly faster than Murmur3 in Java, it suffers from hash
+ * collisions for specific sequence of repeating bytes. Check the following link for more info
+ * https://code.google.com/p/smhasher/wiki/MurmurHash2Flaw
+ */
+public class BloomFilter {
+ public static final double DEFAULT_FPP = 0.05;
+ protected BitSet bitSet;
+ protected int numBits;
+ protected int numHashFunctions;
+
+ public BloomFilter() {
+ }
+
+ public BloomFilter(long expectedEntries) {
+ this(expectedEntries, DEFAULT_FPP);
+ }
+
+ public BloomFilter(long expectedEntries, double fpp) {
+ checkArgument(expectedEntries > 0, "expectedEntries should be > 0");
+ checkArgument(fpp > 0.0 && fpp < 1.0, "False positive probability should be > 0.0 & < 1.0");
+ int nb = optimalNumOfBits(expectedEntries, fpp);
+ // make 'm' multiple of 64
+ this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
+ this.numHashFunctions = optimalNumOfHashFunctions(expectedEntries, numBits);
+ this.bitSet = new BitSet(numBits);
+ }
+
+ /**
+ * A constructor to support rebuilding the BloomFilter from a serialized representation.
+ * @param bits
+ * @param numBits
+ * @param numFuncs
+ */
+ public BloomFilter(List<Long> bits, int numBits, int numFuncs) {
+ super();
+ long[] copied = new long[bits.size()];
+ for (int i = 0; i < bits.size(); i++) copied[i] = bits.get(i);
+ bitSet = new BitSet(copied);
+ this.numBits = numBits;
+ numHashFunctions = numFuncs;
+ }
+
+ static int optimalNumOfHashFunctions(long n, long m) {
+ return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
+ }
+
+ static int optimalNumOfBits(long n, double p) {
+ return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
+ }
+
+ public void add(byte[] val) {
+ if (val == null) {
+ addBytes(val, -1, -1);
+ } else {
+ addBytes(val, 0, val.length);
+ }
+ }
+
+ public void addBytes(byte[] val, int offset, int length) {
+ // We use the trick mentioned in "Less Hashing, Same Performance: Building a Better Bloom Filter"
+ // by Kirsch et.al. From abstract 'only two hash functions are necessary to effectively
+ // implement a Bloom filter without any loss in the asymptotic false positive probability'
+
+ // Lets split up 64-bit hashcode into two 32-bit hash codes and employ the technique mentioned
+ // in the above paper
+ long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+ Murmur3.hash64(val, offset, length);
+ addHash(hash64);
+ }
+
+ private void addHash(long hash64) {
+ int hash1 = (int) hash64;
+ int hash2 = (int) (hash64 >>> 32);
+
+ for (int i = 1; i <= numHashFunctions; i++) {
+ int combinedHash = hash1 + (i * hash2);
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ int pos = combinedHash % numBits;
+ bitSet.set(pos);
+ }
+ }
+
+ public void addString(String val) {
+ if (val == null) {
+ add(null);
+ } else {
+ add(val.getBytes());
+ }
+ }
+
+ public void addLong(long val) {
+ addHash(getLongHash(val));
+ }
+
+ public void addDouble(double val) {
+ addLong(Double.doubleToLongBits(val));
+ }
+
+ public boolean test(byte[] val) {
+ if (val == null) {
+ return testBytes(val, -1, -1);
+ }
+ return testBytes(val, 0, val.length);
+ }
+
+ public boolean testBytes(byte[] val, int offset, int length) {
+ long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+ Murmur3.hash64(val, offset, length);
+ return testHash(hash64);
+ }
+
+ private boolean testHash(long hash64) {
+ int hash1 = (int) hash64;
+ int hash2 = (int) (hash64 >>> 32);
+
+ for (int i = 1; i <= numHashFunctions; i++) {
+ int combinedHash = hash1 + (i * hash2);
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ int pos = combinedHash % numBits;
+ if (!bitSet.get(pos)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean testString(String val) {
+ if (val == null) {
+ return test(null);
+ } else {
+ return test(val.getBytes());
+ }
+ }
+
+ public boolean testLong(long val) {
+ return testHash(getLongHash(val));
+ }
+
+ // Thomas Wang's integer hash function
+ // http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
+ private long getLongHash(long key) {
+ key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+ key = key ^ (key >> 24);
+ key = (key + (key << 3)) + (key << 8); // key * 265
+ key = key ^ (key >> 14);
+ key = (key + (key << 2)) + (key << 4); // key * 21
+ key = key ^ (key >> 28);
+ key = key + (key << 31);
+ return key;
+ }
+
+ public boolean testDouble(double val) {
+ return testLong(Double.doubleToLongBits(val));
+ }
+
+ public long sizeInBytes() {
+ return getBitSize() / 8;
+ }
+
+ public int getBitSize() {
+ return bitSet.getData().length * Long.SIZE;
+ }
+
+ public int getNumHashFunctions() {
+ return numHashFunctions;
+ }
+
+ public long[] getBitSet() {
+ return bitSet.getData();
+ }
+
+ @Override
+ public String toString() {
+ return "m: " + numBits + " k: " + numHashFunctions;
+ }
+
+ /**
+ * Merge the specified bloom filter with current bloom filter.
+ *
+ * @param that - bloom filter to merge
+ */
+ public void merge(BloomFilter that) {
+ if (this != that && this.numBits == that.numBits && this.numHashFunctions == that.numHashFunctions) {
+ this.bitSet.putAll(that.bitSet);
+ } else {
+ throw new IllegalArgumentException("BloomFilters are not compatible for merging." +
+ " this - " + this.toString() + " that - " + that.toString());
+ }
+ }
+
+ public void reset() {
+ this.bitSet.clear();
+ }
+
+ /**
+ * Bare metal bit set implementation. For performance reasons, this implementation does not check
+ * for index bounds nor expand the bit set size if the specified index is greater than the size.
+ */
+ public class BitSet {
+ private final long[] data;
+
+ public BitSet(long bits) {
+ this(new long[(int) Math.ceil((double) bits / (double) Long.SIZE)]);
+ }
+
+ /**
+ * Deserialize long array as bit set.
+ *
+ * @param data - bit array
+ */
+ public BitSet(long[] data) {
+ assert data.length > 0 : "data length is zero!";
+ this.data = data;
+ }
+
+ /**
+ * Sets the bit at specified index.
+ *
+ * @param index - position
+ */
+ public void set(int index) {
+ data[index >>> 6] |= (1L << index);
+ }
+
+ /**
+ * Returns true if the bit is set in the specified index.
+ *
+ * @param index - position
+ * @return - value at the bit position
+ */
+ public boolean get(int index) {
+ return (data[index >>> 6] & (1L << index)) != 0;
+ }
+
+ /**
+ * Number of bits
+ */
+ public long bitSize() {
+ return (long) data.length * Long.SIZE;
+ }
+
+ public long[] getData() {
+ return data;
+ }
+
+ /**
+ * Combines the two BitArrays using bitwise OR.
+ */
+ public void putAll(BitSet array) {
+ assert data.length == array.data.length :
+ "BitArrays must be of equal length (" + data.length + "!= " + array.data.length + ")";
+ for (int i = 0; i < data.length; i++) {
+ data[i] |= array.data[i];
+ }
+ }
+
+ /**
+ * Clear the bit set.
+ */
+ public void clear() {
+ Arrays.fill(data, 0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/storage-api/src/java/org/apache/hive/common/util/Murmur3.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hive/common/util/Murmur3.java b/storage-api/src/java/org/apache/hive/common/util/Murmur3.java
new file mode 100644
index 0000000..88c3514
--- /dev/null
+++ b/storage-api/src/java/org/apache/hive/common/util/Murmur3.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.common.util;
+
+/**
+ * Murmur3 is successor to Murmur2 fast non-crytographic hash algorithms.
+ *
+ * Murmur3 32 and 128 bit variants.
+ * 32-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#94
+ * 128-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#255
+ *
+ * This is a public domain code with no copyrights.
+ * From homepage of MurmurHash (https://code.google.com/p/smhasher/),
+ * "All MurmurHash versions are public domain software, and the author disclaims all copyright
+ * to their code."
+ */
+public class Murmur3 {
+ // from 64-bit linear congruential generator
+ public static final long NULL_HASHCODE = 2862933555777941757L;
+
+ // Constants for 32 bit variant
+ private static final int C1_32 = 0xcc9e2d51;
+ private static final int C2_32 = 0x1b873593;
+ private static final int R1_32 = 15;
+ private static final int R2_32 = 13;
+ private static final int M_32 = 5;
+ private static final int N_32 = 0xe6546b64;
+
+ // Constants for 128 bit variant
+ private static final long C1 = 0x87c37b91114253d5L;
+ private static final long C2 = 0x4cf5ad432745937fL;
+ private static final int R1 = 31;
+ private static final int R2 = 27;
+ private static final int R3 = 33;
+ private static final int M = 5;
+ private static final int N1 = 0x52dce729;
+ private static final int N2 = 0x38495ab5;
+
+ private static final int DEFAULT_SEED = 104729;
+
+ /**
+ * Murmur3 32-bit variant.
+ *
+ * @param data - input byte array
+ * @return - hashcode
+ */
+ public static int hash32(byte[] data) {
+ return hash32(data, data.length, DEFAULT_SEED);
+ }
+
+ /**
+ * Murmur3 32-bit variant.
+ *
+ * @param data - input byte array
+ * @param length - length of array
+ * @param seed - seed. (default 0)
+ * @return - hashcode
+ */
+ public static int hash32(byte[] data, int length, int seed) {
+ int hash = seed;
+ final int nblocks = length >> 2;
+
+ // body
+ for (int i = 0; i < nblocks; i++) {
+ int i_4 = i << 2;
+ int k = (data[i_4] & 0xff)
+ | ((data[i_4 + 1] & 0xff) << 8)
+ | ((data[i_4 + 2] & 0xff) << 16)
+ | ((data[i_4 + 3] & 0xff) << 24);
+
+ // mix functions
+ k *= C1_32;
+ k = Integer.rotateLeft(k, R1_32);
+ k *= C2_32;
+ hash ^= k;
+ hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
+ }
+
+ // tail
+ int idx = nblocks << 2;
+ int k1 = 0;
+ switch (length - idx) {
+ case 3:
+ k1 ^= data[idx + 2] << 16;
+ case 2:
+ k1 ^= data[idx + 1] << 8;
+ case 1:
+ k1 ^= data[idx];
+
+ // mix functions
+ k1 *= C1_32;
+ k1 = Integer.rotateLeft(k1, R1_32);
+ k1 *= C2_32;
+ hash ^= k1;
+ }
+
+ // finalization
+ hash ^= length;
+ hash ^= (hash >>> 16);
+ hash *= 0x85ebca6b;
+ hash ^= (hash >>> 13);
+ hash *= 0xc2b2ae35;
+ hash ^= (hash >>> 16);
+
+ return hash;
+ }
+
+ /**
+ * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant.
+ *
+ * @param data - input byte array
+ * @return - hashcode
+ */
+ public static long hash64(byte[] data) {
+ return hash64(data, 0, data.length, DEFAULT_SEED);
+ }
+
+ public static long hash64(byte[] data, int offset, int length) {
+ return hash64(data, offset, length, DEFAULT_SEED);
+ }
+
+ /**
+ * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant.
+ *
+ * @param data - input byte array
+ * @param length - length of array
+ * @param seed - seed. (default is 0)
+ * @return - hashcode
+ */
+ public static long hash64(byte[] data, int offset, int length, int seed) {
+ long hash = seed;
+ final int nblocks = length >> 3;
+
+ // body
+ for (int i = 0; i < nblocks; i++) {
+ final int i8 = i << 3;
+ long k = ((long) data[offset + i8] & 0xff)
+ | (((long) data[offset + i8 + 1] & 0xff) << 8)
+ | (((long) data[offset + i8 + 2] & 0xff) << 16)
+ | (((long) data[offset + i8 + 3] & 0xff) << 24)
+ | (((long) data[offset + i8 + 4] & 0xff) << 32)
+ | (((long) data[offset + i8 + 5] & 0xff) << 40)
+ | (((long) data[offset + i8 + 6] & 0xff) << 48)
+ | (((long) data[offset + i8 + 7] & 0xff) << 56);
+
+ // mix functions
+ k *= C1;
+ k = Long.rotateLeft(k, R1);
+ k *= C2;
+ hash ^= k;
+ hash = Long.rotateLeft(hash, R2) * M + N1;
+ }
+
+ // tail
+ long k1 = 0;
+ int tailStart = nblocks << 3;
+ switch (length - tailStart) {
+ case 7:
+ k1 ^= ((long) data[offset + tailStart + 6] & 0xff) << 48;
+ case 6:
+ k1 ^= ((long) data[offset + tailStart + 5] & 0xff) << 40;
+ case 5:
+ k1 ^= ((long) data[offset + tailStart + 4] & 0xff) << 32;
+ case 4:
+ k1 ^= ((long) data[offset + tailStart + 3] & 0xff) << 24;
+ case 3:
+ k1 ^= ((long) data[offset + tailStart + 2] & 0xff) << 16;
+ case 2:
+ k1 ^= ((long) data[offset + tailStart + 1] & 0xff) << 8;
+ case 1:
+ k1 ^= ((long) data[offset + tailStart] & 0xff);
+ k1 *= C1;
+ k1 = Long.rotateLeft(k1, R1);
+ k1 *= C2;
+ hash ^= k1;
+ }
+
+ // finalization
+ hash ^= length;
+ hash = fmix64(hash);
+
+ return hash;
+ }
+
+ /**
+ * Murmur3 128-bit variant.
+ *
+ * @param data - input byte array
+ * @return - hashcode (2 longs)
+ */
+ public static long[] hash128(byte[] data) {
+ return hash128(data, 0, data.length, DEFAULT_SEED);
+ }
+
+ /**
+ * Murmur3 128-bit variant.
+ *
+ * @param data - input byte array
+ * @param offset - the first element of array
+ * @param length - length of array
+ * @param seed - seed. (default is 0)
+ * @return - hashcode (2 longs)
+ */
+ public static long[] hash128(byte[] data, int offset, int length, int seed) {
+ long h1 = seed;
+ long h2 = seed;
+ final int nblocks = length >> 4;
+
+ // body
+ for (int i = 0; i < nblocks; i++) {
+ final int i16 = i << 4;
+ long k1 = ((long) data[offset + i16] & 0xff)
+ | (((long) data[offset + i16 + 1] & 0xff) << 8)
+ | (((long) data[offset + i16 + 2] & 0xff) << 16)
+ | (((long) data[offset + i16 + 3] & 0xff) << 24)
+ | (((long) data[offset + i16 + 4] & 0xff) << 32)
+ | (((long) data[offset + i16 + 5] & 0xff) << 40)
+ | (((long) data[offset + i16 + 6] & 0xff) << 48)
+ | (((long) data[offset + i16 + 7] & 0xff) << 56);
+
+ long k2 = ((long) data[offset + i16 + 8] & 0xff)
+ | (((long) data[offset + i16 + 9] & 0xff) << 8)
+ | (((long) data[offset + i16 + 10] & 0xff) << 16)
+ | (((long) data[offset + i16 + 11] & 0xff) << 24)
+ | (((long) data[offset + i16 + 12] & 0xff) << 32)
+ | (((long) data[offset + i16 + 13] & 0xff) << 40)
+ | (((long) data[offset + i16 + 14] & 0xff) << 48)
+ | (((long) data[offset + i16 + 15] & 0xff) << 56);
+
+ // mix functions for k1
+ k1 *= C1;
+ k1 = Long.rotateLeft(k1, R1);
+ k1 *= C2;
+ h1 ^= k1;
+ h1 = Long.rotateLeft(h1, R2);
+ h1 += h2;
+ h1 = h1 * M + N1;
+
+ // mix functions for k2
+ k2 *= C2;
+ k2 = Long.rotateLeft(k2, R3);
+ k2 *= C1;
+ h2 ^= k2;
+ h2 = Long.rotateLeft(h2, R1);
+ h2 += h1;
+ h2 = h2 * M + N2;
+ }
+
+ // tail
+ long k1 = 0;
+ long k2 = 0;
+ int tailStart = nblocks << 4;
+ switch (length - tailStart) {
+ case 15:
+ k2 ^= (long) (data[offset + tailStart + 14] & 0xff) << 48;
+ case 14:
+ k2 ^= (long) (data[offset + tailStart + 13] & 0xff) << 40;
+ case 13:
+ k2 ^= (long) (data[offset + tailStart + 12] & 0xff) << 32;
+ case 12:
+ k2 ^= (long) (data[offset + tailStart + 11] & 0xff) << 24;
+ case 11:
+ k2 ^= (long) (data[offset + tailStart + 10] & 0xff) << 16;
+ case 10:
+ k2 ^= (long) (data[offset + tailStart + 9] & 0xff) << 8;
+ case 9:
+ k2 ^= (long) (data[offset + tailStart + 8] & 0xff);
+ k2 *= C2;
+ k2 = Long.rotateLeft(k2, R3);
+ k2 *= C1;
+ h2 ^= k2;
+
+ case 8:
+ k1 ^= (long) (data[offset + tailStart + 7] & 0xff) << 56;
+ case 7:
+ k1 ^= (long) (data[offset + tailStart + 6] & 0xff) << 48;
+ case 6:
+ k1 ^= (long) (data[offset + tailStart + 5] & 0xff) << 40;
+ case 5:
+ k1 ^= (long) (data[offset + tailStart + 4] & 0xff) << 32;
+ case 4:
+ k1 ^= (long) (data[offset + tailStart + 3] & 0xff) << 24;
+ case 3:
+ k1 ^= (long) (data[offset + tailStart + 2] & 0xff) << 16;
+ case 2:
+ k1 ^= (long) (data[offset + tailStart + 1] & 0xff) << 8;
+ case 1:
+ k1 ^= (long) (data[offset + tailStart] & 0xff);
+ k1 *= C1;
+ k1 = Long.rotateLeft(k1, R1);
+ k1 *= C2;
+ h1 ^= k1;
+ }
+
+ // finalization
+ h1 ^= length;
+ h2 ^= length;
+
+ h1 += h2;
+ h2 += h1;
+
+ h1 = fmix64(h1);
+ h2 = fmix64(h2);
+
+ h1 += h2;
+ h2 += h1;
+
+ return new long[]{h1, h2};
+ }
+
+ private static long fmix64(long h) {
+ h ^= (h >>> 33);
+ h *= 0xff51afd7ed558ccdL;
+ h ^= (h >>> 33);
+ h *= 0xc4ceb9fe1a85ec53L;
+ h ^= (h >>> 33);
+ return h;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/storage-api/src/test/org/apache/hive/common/util/TestMurmur3.java
----------------------------------------------------------------------
diff --git a/storage-api/src/test/org/apache/hive/common/util/TestMurmur3.java b/storage-api/src/test/org/apache/hive/common/util/TestMurmur3.java
new file mode 100644
index 0000000..5facc7c
--- /dev/null
+++ b/storage-api/src/test/org/apache/hive/common/util/TestMurmur3.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.common.util;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Random;
+
+/**
+ * Tests for Murmur3 variants.
+ */
+public class TestMurmur3 {
+
+ @Test
+ public void testHashCodesM3_32_string() {
+ String key = "test";
+ int seed = 123;
+ HashFunction hf = Hashing.murmur3_32(seed);
+ int hc1 = hf.hashBytes(key.getBytes()).asInt();
+ int hc2 = Murmur3.hash32(key.getBytes(), key.getBytes().length, seed);
+ assertEquals(hc1, hc2);
+
+ key = "testkey";
+ hc1 = hf.hashBytes(key.getBytes()).asInt();
+ hc2 = Murmur3.hash32(key.getBytes(), key.getBytes().length, seed);
+ assertEquals(hc1, hc2);
+ }
+
+ @Test
+ public void testHashCodesM3_32_ints() {
+ int seed = 123;
+ Random rand = new Random(seed);
+ HashFunction hf = Hashing.murmur3_32(seed);
+ for (int i = 0; i < 1000; i++) {
+ int val = rand.nextInt();
+ byte[] data = ByteBuffer.allocate(4).putInt(val).array();
+ int hc1 = hf.hashBytes(data).asInt();
+ int hc2 = Murmur3.hash32(data, data.length, seed);
+ assertEquals(hc1, hc2);
+ }
+ }
+
+ @Test
+ public void testHashCodesM3_32_longs() {
+ int seed = 123;
+ Random rand = new Random(seed);
+ HashFunction hf = Hashing.murmur3_32(seed);
+ for (int i = 0; i < 1000; i++) {
+ long val = rand.nextLong();
+ byte[] data = ByteBuffer.allocate(8).putLong(val).array();
+ int hc1 = hf.hashBytes(data).asInt();
+ int hc2 = Murmur3.hash32(data, data.length, seed);
+ assertEquals(hc1, hc2);
+ }
+ }
+
+ @Test
+ public void testHashCodesM3_32_double() {
+ int seed = 123;
+ Random rand = new Random(seed);
+ HashFunction hf = Hashing.murmur3_32(seed);
+ for (int i = 0; i < 1000; i++) {
+ double val = rand.nextDouble();
+ byte[] data = ByteBuffer.allocate(8).putDouble(val).array();
+ int hc1 = hf.hashBytes(data).asInt();
+ int hc2 = Murmur3.hash32(data, data.length, seed);
+ assertEquals(hc1, hc2);
+ }
+ }
+
+ @Test
+ public void testHashCodesM3_128_string() {
+ String key = "test";
+ int seed = 123;
+ HashFunction hf = Hashing.murmur3_128(seed);
+ // guava stores the hashcodes in little endian order
+ ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
+ buf.put(hf.hashBytes(key.getBytes()).asBytes());
+ buf.flip();
+ long gl1 = buf.getLong();
+ long gl2 = buf.getLong(8);
+ long[] hc = Murmur3.hash128(key.getBytes(), 0, key.getBytes().length, seed);
+ long m1 = hc[0];
+ long m2 = hc[1];
+ assertEquals(gl1, m1);
+ assertEquals(gl2, m2);
+
+ key = "testkey128_testkey128";
+ buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
+ buf.put(hf.hashBytes(key.getBytes()).asBytes());
+ buf.flip();
+ gl1 = buf.getLong();
+ gl2 = buf.getLong(8);
+ byte[] keyBytes = key.getBytes();
+ hc = Murmur3.hash128(keyBytes, 0, keyBytes.length, seed);
+ m1 = hc[0];
+ m2 = hc[1];
+ assertEquals(gl1, m1);
+ assertEquals(gl2, m2);
+
+ byte[] offsetKeyBytes = new byte[keyBytes.length + 35];
+ Arrays.fill(offsetKeyBytes, (byte) -1);
+ System.arraycopy(keyBytes, 0, offsetKeyBytes, 35, keyBytes.length);
+ hc = Murmur3.hash128(offsetKeyBytes, 35, keyBytes.length, seed);
+ assertEquals(gl1, hc[0]);
+ assertEquals(gl2, hc[1]);
+ }
+
+ @Test
+ public void testHashCodeM3_64() {
+ byte[] origin = ("It was the best of times, it was the worst of times," +
+ " it was the age of wisdom, it was the age of foolishness," +
+ " it was the epoch of belief, it was the epoch of incredulity," +
+ " it was the season of Light, it was the season of Darkness," +
+ " it was the spring of hope, it was the winter of despair," +
+ " we had everything before us, we had nothing before us," +
+ " we were all going direct to Heaven," +
+ " we were all going direct the other way.").getBytes();
+ long hash = Murmur3.hash64(origin, 0, origin.length);
+ assertEquals(305830725663368540L, hash);
+
+ byte[] originOffset = new byte[origin.length + 150];
+ Arrays.fill(originOffset, (byte) 123);
+ System.arraycopy(origin, 0, originOffset, 150, origin.length);
+ hash = Murmur3.hash64(originOffset, 150, origin.length);
+ assertEquals(305830725663368540L, hash);
+ }
+
+ @Test
+ public void testHashCodesM3_128_ints() {
+ int seed = 123;
+ Random rand = new Random(seed);
+ HashFunction hf = Hashing.murmur3_128(seed);
+ for (int i = 0; i < 1000; i++) {
+ int val = rand.nextInt();
+ byte[] data = ByteBuffer.allocate(4).putInt(val).array();
+ // guava stores the hashcodes in little endian order
+ ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
+ buf.put(hf.hashBytes(data).asBytes());
+ buf.flip();
+ long gl1 = buf.getLong();
+ long gl2 = buf.getLong(8);
+ long[] hc = Murmur3.hash128(data, 0, data.length, seed);
+ long m1 = hc[0];
+ long m2 = hc[1];
+ assertEquals(gl1, m1);
+ assertEquals(gl2, m2);
+
+ byte[] offsetData = new byte[data.length + 50];
+ System.arraycopy(data, 0, offsetData, 50, data.length);
+ hc = Murmur3.hash128(offsetData, 50, data.length, seed);
+ assertEquals(gl1, hc[0]);
+ assertEquals(gl2, hc[1]);
+ }
+ }
+
+ @Test
+ public void testHashCodesM3_128_longs() {
+ int seed = 123;
+ Random rand = new Random(seed);
+ HashFunction hf = Hashing.murmur3_128(seed);
+ for (int i = 0; i < 1000; i++) {
+ long val = rand.nextLong();
+ byte[] data = ByteBuffer.allocate(8).putLong(val).array();
+ // guava stores the hashcodes in little endian order
+ ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
+ buf.put(hf.hashBytes(data).asBytes());
+ buf.flip();
+ long gl1 = buf.getLong();
+ long gl2 = buf.getLong(8);
+ long[] hc = Murmur3.hash128(data, 0, data.length, seed);
+ long m1 = hc[0];
+ long m2 = hc[1];
+ assertEquals(gl1, m1);
+ assertEquals(gl2, m2);
+ }
+ }
+
+ @Test
+ public void testHashCodesM3_128_double() {
+ int seed = 123;
+ Random rand = new Random(seed);
+ HashFunction hf = Hashing.murmur3_128(seed);
+ for (int i = 0; i < 1000; i++) {
+ double val = rand.nextDouble();
+ byte[] data = ByteBuffer.allocate(8).putDouble(val).array();
+ // guava stores the hashcodes in little endian order
+ ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
+ buf.put(hf.hashBytes(data).asBytes());
+ buf.flip();
+ long gl1 = buf.getLong();
+ long gl2 = buf.getLong(8);
+ long[] hc = Murmur3.hash128(data, 0, data.length, seed);
+ long m1 = hc[0];
+ long m2 = hc[1];
+ assertEquals(gl1, m1);
+ assertEquals(gl2, m2);
+ }
+ }
+}
[5/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.
Posted by om...@apache.org.
HIVE-12055. Move WriterImpl over to orc module.
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/06e39ebe
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/06e39ebe
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/06e39ebe
Branch: refs/heads/master
Commit: 06e39ebe07d7854df669529e73f1c461f3c7d9d4
Parents: 49dc645
Author: Owen O'Malley <om...@apache.org>
Authored: Mon Dec 14 13:35:39 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Dec 14 13:35:39 2015 -0800
----------------------------------------------------------------------
.../apache/hive/common/util/BloomFilter.java | 309 --
.../org/apache/hive/common/util/Murmur3.java | 335 --
.../apache/hive/common/util/TestMurmur3.java | 224 --
orc/src/java/org/apache/orc/BloomFilterIO.java | 43 +
orc/src/java/org/apache/orc/OrcFile.java | 22 +
.../java/org/apache/orc/TypeDescription.java | 26 +-
.../java/org/apache/orc/impl/WriterImpl.java | 2912 +++++++++++++++
.../hive/ql/io/filters/BloomFilterIO.java | 44 -
.../apache/hadoop/hive/ql/io/orc/FileDump.java | 2 +-
.../hadoop/hive/ql/io/orc/JsonFileDump.java | 2 +-
.../apache/hadoop/hive/ql/io/orc/OrcFile.java | 30 +-
.../hadoop/hive/ql/io/orc/ReaderImpl.java | 15 +
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 2 +-
.../apache/hadoop/hive/ql/io/orc/Writer.java | 2 +-
.../hadoop/hive/ql/io/orc/WriterImpl.java | 3394 ++----------------
.../hadoop/hive/ql/util/JavaDataModel.java | 335 --
.../hadoop/hive/ql/io/orc/TestFileDump.java | 25 +-
.../hive/ql/io/orc/TestNewIntegerEncoding.java | 2 +-
.../hadoop/hive/ql/io/orc/TestOrcFile.java | 9 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 12 +-
.../hive/ql/io/orc/TestRecordReaderImpl.java | 2 +-
.../resources/orc-file-dump-bloomfilter.out | 2 +-
.../resources/orc-file-dump-bloomfilter2.out | 2 +-
.../orc-file-dump-dictionary-threshold.out | 2 +-
ql/src/test/resources/orc-file-dump.json | 2 +-
ql/src/test/resources/orc-file-dump.out | 2 +-
ql/src/test/resources/orc-file-has-null.out | 2 +-
.../results/clientpositive/orc_file_dump.q.out | 6 +-
.../results/clientpositive/orc_merge10.q.out | 4 +-
.../results/clientpositive/orc_merge11.q.out | 6 +-
.../clientpositive/tez/orc_merge10.q.out | 4 +-
.../clientpositive/tez/orc_merge11.q.out | 6 +-
.../hadoop/hive/ql/util/JavaDataModel.java | 335 ++
.../apache/hive/common/util/BloomFilter.java | 309 ++
.../org/apache/hive/common/util/Murmur3.java | 335 ++
.../apache/hive/common/util/TestMurmur3.java | 224 ++
36 files changed, 4489 insertions(+), 4499 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/common/src/java/org/apache/hive/common/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/BloomFilter.java b/common/src/java/org/apache/hive/common/util/BloomFilter.java
deleted file mode 100644
index bb0b8f2..0000000
--- a/common/src/java/org/apache/hive/common/util/BloomFilter.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.common.util;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * BloomFilter is a probabilistic data structure for set membership check. BloomFilters are
- * highly space efficient when compared to using a HashSet. Because of the probabilistic nature of
- * bloom filter false positive (element not present in bloom filter but test() says true) are
- * possible but false negatives are not possible (if element is present then test() will never
- * say false). The false positive probability is configurable (default: 5%) depending on which
- * storage requirement may increase or decrease. Lower the false positive probability greater
- * is the space requirement.
- * Bloom filters are sensitive to number of elements that will be inserted in the bloom filter.
- * During the creation of bloom filter expected number of entries must be specified. If the number
- * of insertions exceed the specified initial number of entries then false positive probability will
- * increase accordingly.
- *
- * Internally, this implementation of bloom filter uses Murmur3 fast non-cryptographic hash
- * algorithm. Although Murmur2 is slightly faster than Murmur3 in Java, it suffers from hash
- * collisions for specific sequence of repeating bytes. Check the following link for more info
- * https://code.google.com/p/smhasher/wiki/MurmurHash2Flaw
- */
-public class BloomFilter {
- public static final double DEFAULT_FPP = 0.05;
- protected BitSet bitSet;
- protected int numBits;
- protected int numHashFunctions;
-
- public BloomFilter() {
- }
-
- public BloomFilter(long expectedEntries) {
- this(expectedEntries, DEFAULT_FPP);
- }
-
- public BloomFilter(long expectedEntries, double fpp) {
- checkArgument(expectedEntries > 0, "expectedEntries should be > 0");
- checkArgument(fpp > 0.0 && fpp < 1.0, "False positive probability should be > 0.0 & < 1.0");
- int nb = optimalNumOfBits(expectedEntries, fpp);
- // make 'm' multiple of 64
- this.numBits = nb + (Long.SIZE - (nb % Long.SIZE));
- this.numHashFunctions = optimalNumOfHashFunctions(expectedEntries, numBits);
- this.bitSet = new BitSet(numBits);
- }
-
- /**
- * A constructor to support rebuilding the BloomFilter from a serialized representation.
- * @param bits
- * @param numBits
- * @param numFuncs
- */
- public BloomFilter(List<Long> bits, int numBits, int numFuncs) {
- super();
- long[] copied = new long[bits.size()];
- for (int i = 0; i < bits.size(); i++) copied[i] = bits.get(i);
- bitSet = new BitSet(copied);
- this.numBits = numBits;
- numHashFunctions = numFuncs;
- }
-
- static int optimalNumOfHashFunctions(long n, long m) {
- return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
- }
-
- static int optimalNumOfBits(long n, double p) {
- return (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
- }
-
- public void add(byte[] val) {
- if (val == null) {
- addBytes(val, -1, -1);
- } else {
- addBytes(val, 0, val.length);
- }
- }
-
- public void addBytes(byte[] val, int offset, int length) {
- // We use the trick mentioned in "Less Hashing, Same Performance: Building a Better Bloom Filter"
- // by Kirsch et.al. From abstract 'only two hash functions are necessary to effectively
- // implement a Bloom filter without any loss in the asymptotic false positive probability'
-
- // Lets split up 64-bit hashcode into two 32-bit hash codes and employ the technique mentioned
- // in the above paper
- long hash64 = val == null ? Murmur3.NULL_HASHCODE :
- Murmur3.hash64(val, offset, length);
- addHash(hash64);
- }
-
- private void addHash(long hash64) {
- int hash1 = (int) hash64;
- int hash2 = (int) (hash64 >>> 32);
-
- for (int i = 1; i <= numHashFunctions; i++) {
- int combinedHash = hash1 + (i * hash2);
- // hashcode should be positive, flip all the bits if it's negative
- if (combinedHash < 0) {
- combinedHash = ~combinedHash;
- }
- int pos = combinedHash % numBits;
- bitSet.set(pos);
- }
- }
-
- public void addString(String val) {
- if (val == null) {
- add(null);
- } else {
- add(val.getBytes());
- }
- }
-
- public void addLong(long val) {
- addHash(getLongHash(val));
- }
-
- public void addDouble(double val) {
- addLong(Double.doubleToLongBits(val));
- }
-
- public boolean test(byte[] val) {
- if (val == null) {
- return testBytes(val, -1, -1);
- }
- return testBytes(val, 0, val.length);
- }
-
- public boolean testBytes(byte[] val, int offset, int length) {
- long hash64 = val == null ? Murmur3.NULL_HASHCODE :
- Murmur3.hash64(val, offset, length);
- return testHash(hash64);
- }
-
- private boolean testHash(long hash64) {
- int hash1 = (int) hash64;
- int hash2 = (int) (hash64 >>> 32);
-
- for (int i = 1; i <= numHashFunctions; i++) {
- int combinedHash = hash1 + (i * hash2);
- // hashcode should be positive, flip all the bits if it's negative
- if (combinedHash < 0) {
- combinedHash = ~combinedHash;
- }
- int pos = combinedHash % numBits;
- if (!bitSet.get(pos)) {
- return false;
- }
- }
- return true;
- }
-
- public boolean testString(String val) {
- if (val == null) {
- return test(null);
- } else {
- return test(val.getBytes());
- }
- }
-
- public boolean testLong(long val) {
- return testHash(getLongHash(val));
- }
-
- // Thomas Wang's integer hash function
- // http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
- private long getLongHash(long key) {
- key = (~key) + (key << 21); // key = (key << 21) - key - 1;
- key = key ^ (key >> 24);
- key = (key + (key << 3)) + (key << 8); // key * 265
- key = key ^ (key >> 14);
- key = (key + (key << 2)) + (key << 4); // key * 21
- key = key ^ (key >> 28);
- key = key + (key << 31);
- return key;
- }
-
- public boolean testDouble(double val) {
- return testLong(Double.doubleToLongBits(val));
- }
-
- public long sizeInBytes() {
- return getBitSize() / 8;
- }
-
- public int getBitSize() {
- return bitSet.getData().length * Long.SIZE;
- }
-
- public int getNumHashFunctions() {
- return numHashFunctions;
- }
-
- public long[] getBitSet() {
- return bitSet.getData();
- }
-
- @Override
- public String toString() {
- return "m: " + numBits + " k: " + numHashFunctions;
- }
-
- /**
- * Merge the specified bloom filter with current bloom filter.
- *
- * @param that - bloom filter to merge
- */
- public void merge(BloomFilter that) {
- if (this != that && this.numBits == that.numBits && this.numHashFunctions == that.numHashFunctions) {
- this.bitSet.putAll(that.bitSet);
- } else {
- throw new IllegalArgumentException("BloomFilters are not compatible for merging." +
- " this - " + this.toString() + " that - " + that.toString());
- }
- }
-
- public void reset() {
- this.bitSet.clear();
- }
-
- /**
- * Bare metal bit set implementation. For performance reasons, this implementation does not check
- * for index bounds nor expand the bit set size if the specified index is greater than the size.
- */
- public class BitSet {
- private final long[] data;
-
- public BitSet(long bits) {
- this(new long[(int) Math.ceil((double) bits / (double) Long.SIZE)]);
- }
-
- /**
- * Deserialize long array as bit set.
- *
- * @param data - bit array
- */
- public BitSet(long[] data) {
- assert data.length > 0 : "data length is zero!";
- this.data = data;
- }
-
- /**
- * Sets the bit at specified index.
- *
- * @param index - position
- */
- public void set(int index) {
- data[index >>> 6] |= (1L << index);
- }
-
- /**
- * Returns true if the bit is set in the specified index.
- *
- * @param index - position
- * @return - value at the bit position
- */
- public boolean get(int index) {
- return (data[index >>> 6] & (1L << index)) != 0;
- }
-
- /**
- * Number of bits
- */
- public long bitSize() {
- return (long) data.length * Long.SIZE;
- }
-
- public long[] getData() {
- return data;
- }
-
- /**
- * Combines the two BitArrays using bitwise OR.
- */
- public void putAll(BitSet array) {
- assert data.length == array.data.length :
- "BitArrays must be of equal length (" + data.length + "!= " + array.data.length + ")";
- for (int i = 0; i < data.length; i++) {
- data[i] |= array.data[i];
- }
- }
-
- /**
- * Clear the bit set.
- */
- public void clear() {
- Arrays.fill(data, 0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/common/src/java/org/apache/hive/common/util/Murmur3.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/Murmur3.java b/common/src/java/org/apache/hive/common/util/Murmur3.java
deleted file mode 100644
index 88c3514..0000000
--- a/common/src/java/org/apache/hive/common/util/Murmur3.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.common.util;
-
-/**
- * Murmur3 is successor to Murmur2 fast non-crytographic hash algorithms.
- *
- * Murmur3 32 and 128 bit variants.
- * 32-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#94
- * 128-bit Java port of https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp#255
- *
- * This is a public domain code with no copyrights.
- * From homepage of MurmurHash (https://code.google.com/p/smhasher/),
- * "All MurmurHash versions are public domain software, and the author disclaims all copyright
- * to their code."
- */
-public class Murmur3 {
- // from 64-bit linear congruential generator
- public static final long NULL_HASHCODE = 2862933555777941757L;
-
- // Constants for 32 bit variant
- private static final int C1_32 = 0xcc9e2d51;
- private static final int C2_32 = 0x1b873593;
- private static final int R1_32 = 15;
- private static final int R2_32 = 13;
- private static final int M_32 = 5;
- private static final int N_32 = 0xe6546b64;
-
- // Constants for 128 bit variant
- private static final long C1 = 0x87c37b91114253d5L;
- private static final long C2 = 0x4cf5ad432745937fL;
- private static final int R1 = 31;
- private static final int R2 = 27;
- private static final int R3 = 33;
- private static final int M = 5;
- private static final int N1 = 0x52dce729;
- private static final int N2 = 0x38495ab5;
-
- private static final int DEFAULT_SEED = 104729;
-
- /**
- * Murmur3 32-bit variant.
- *
- * @param data - input byte array
- * @return - hashcode
- */
- public static int hash32(byte[] data) {
- return hash32(data, data.length, DEFAULT_SEED);
- }
-
- /**
- * Murmur3 32-bit variant.
- *
- * @param data - input byte array
- * @param length - length of array
- * @param seed - seed. (default 0)
- * @return - hashcode
- */
- public static int hash32(byte[] data, int length, int seed) {
- int hash = seed;
- final int nblocks = length >> 2;
-
- // body
- for (int i = 0; i < nblocks; i++) {
- int i_4 = i << 2;
- int k = (data[i_4] & 0xff)
- | ((data[i_4 + 1] & 0xff) << 8)
- | ((data[i_4 + 2] & 0xff) << 16)
- | ((data[i_4 + 3] & 0xff) << 24);
-
- // mix functions
- k *= C1_32;
- k = Integer.rotateLeft(k, R1_32);
- k *= C2_32;
- hash ^= k;
- hash = Integer.rotateLeft(hash, R2_32) * M_32 + N_32;
- }
-
- // tail
- int idx = nblocks << 2;
- int k1 = 0;
- switch (length - idx) {
- case 3:
- k1 ^= data[idx + 2] << 16;
- case 2:
- k1 ^= data[idx + 1] << 8;
- case 1:
- k1 ^= data[idx];
-
- // mix functions
- k1 *= C1_32;
- k1 = Integer.rotateLeft(k1, R1_32);
- k1 *= C2_32;
- hash ^= k1;
- }
-
- // finalization
- hash ^= length;
- hash ^= (hash >>> 16);
- hash *= 0x85ebca6b;
- hash ^= (hash >>> 13);
- hash *= 0xc2b2ae35;
- hash ^= (hash >>> 16);
-
- return hash;
- }
-
- /**
- * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant.
- *
- * @param data - input byte array
- * @return - hashcode
- */
- public static long hash64(byte[] data) {
- return hash64(data, 0, data.length, DEFAULT_SEED);
- }
-
- public static long hash64(byte[] data, int offset, int length) {
- return hash64(data, offset, length, DEFAULT_SEED);
- }
-
- /**
- * Murmur3 64-bit variant. This is essentially MSB 8 bytes of Murmur3 128-bit variant.
- *
- * @param data - input byte array
- * @param length - length of array
- * @param seed - seed. (default is 0)
- * @return - hashcode
- */
- public static long hash64(byte[] data, int offset, int length, int seed) {
- long hash = seed;
- final int nblocks = length >> 3;
-
- // body
- for (int i = 0; i < nblocks; i++) {
- final int i8 = i << 3;
- long k = ((long) data[offset + i8] & 0xff)
- | (((long) data[offset + i8 + 1] & 0xff) << 8)
- | (((long) data[offset + i8 + 2] & 0xff) << 16)
- | (((long) data[offset + i8 + 3] & 0xff) << 24)
- | (((long) data[offset + i8 + 4] & 0xff) << 32)
- | (((long) data[offset + i8 + 5] & 0xff) << 40)
- | (((long) data[offset + i8 + 6] & 0xff) << 48)
- | (((long) data[offset + i8 + 7] & 0xff) << 56);
-
- // mix functions
- k *= C1;
- k = Long.rotateLeft(k, R1);
- k *= C2;
- hash ^= k;
- hash = Long.rotateLeft(hash, R2) * M + N1;
- }
-
- // tail
- long k1 = 0;
- int tailStart = nblocks << 3;
- switch (length - tailStart) {
- case 7:
- k1 ^= ((long) data[offset + tailStart + 6] & 0xff) << 48;
- case 6:
- k1 ^= ((long) data[offset + tailStart + 5] & 0xff) << 40;
- case 5:
- k1 ^= ((long) data[offset + tailStart + 4] & 0xff) << 32;
- case 4:
- k1 ^= ((long) data[offset + tailStart + 3] & 0xff) << 24;
- case 3:
- k1 ^= ((long) data[offset + tailStart + 2] & 0xff) << 16;
- case 2:
- k1 ^= ((long) data[offset + tailStart + 1] & 0xff) << 8;
- case 1:
- k1 ^= ((long) data[offset + tailStart] & 0xff);
- k1 *= C1;
- k1 = Long.rotateLeft(k1, R1);
- k1 *= C2;
- hash ^= k1;
- }
-
- // finalization
- hash ^= length;
- hash = fmix64(hash);
-
- return hash;
- }
-
- /**
- * Murmur3 128-bit variant.
- *
- * @param data - input byte array
- * @return - hashcode (2 longs)
- */
- public static long[] hash128(byte[] data) {
- return hash128(data, 0, data.length, DEFAULT_SEED);
- }
-
- /**
- * Murmur3 128-bit variant.
- *
- * @param data - input byte array
- * @param offset - the first element of array
- * @param length - length of array
- * @param seed - seed. (default is 0)
- * @return - hashcode (2 longs)
- */
- public static long[] hash128(byte[] data, int offset, int length, int seed) {
- long h1 = seed;
- long h2 = seed;
- final int nblocks = length >> 4;
-
- // body
- for (int i = 0; i < nblocks; i++) {
- final int i16 = i << 4;
- long k1 = ((long) data[offset + i16] & 0xff)
- | (((long) data[offset + i16 + 1] & 0xff) << 8)
- | (((long) data[offset + i16 + 2] & 0xff) << 16)
- | (((long) data[offset + i16 + 3] & 0xff) << 24)
- | (((long) data[offset + i16 + 4] & 0xff) << 32)
- | (((long) data[offset + i16 + 5] & 0xff) << 40)
- | (((long) data[offset + i16 + 6] & 0xff) << 48)
- | (((long) data[offset + i16 + 7] & 0xff) << 56);
-
- long k2 = ((long) data[offset + i16 + 8] & 0xff)
- | (((long) data[offset + i16 + 9] & 0xff) << 8)
- | (((long) data[offset + i16 + 10] & 0xff) << 16)
- | (((long) data[offset + i16 + 11] & 0xff) << 24)
- | (((long) data[offset + i16 + 12] & 0xff) << 32)
- | (((long) data[offset + i16 + 13] & 0xff) << 40)
- | (((long) data[offset + i16 + 14] & 0xff) << 48)
- | (((long) data[offset + i16 + 15] & 0xff) << 56);
-
- // mix functions for k1
- k1 *= C1;
- k1 = Long.rotateLeft(k1, R1);
- k1 *= C2;
- h1 ^= k1;
- h1 = Long.rotateLeft(h1, R2);
- h1 += h2;
- h1 = h1 * M + N1;
-
- // mix functions for k2
- k2 *= C2;
- k2 = Long.rotateLeft(k2, R3);
- k2 *= C1;
- h2 ^= k2;
- h2 = Long.rotateLeft(h2, R1);
- h2 += h1;
- h2 = h2 * M + N2;
- }
-
- // tail
- long k1 = 0;
- long k2 = 0;
- int tailStart = nblocks << 4;
- switch (length - tailStart) {
- case 15:
- k2 ^= (long) (data[offset + tailStart + 14] & 0xff) << 48;
- case 14:
- k2 ^= (long) (data[offset + tailStart + 13] & 0xff) << 40;
- case 13:
- k2 ^= (long) (data[offset + tailStart + 12] & 0xff) << 32;
- case 12:
- k2 ^= (long) (data[offset + tailStart + 11] & 0xff) << 24;
- case 11:
- k2 ^= (long) (data[offset + tailStart + 10] & 0xff) << 16;
- case 10:
- k2 ^= (long) (data[offset + tailStart + 9] & 0xff) << 8;
- case 9:
- k2 ^= (long) (data[offset + tailStart + 8] & 0xff);
- k2 *= C2;
- k2 = Long.rotateLeft(k2, R3);
- k2 *= C1;
- h2 ^= k2;
-
- case 8:
- k1 ^= (long) (data[offset + tailStart + 7] & 0xff) << 56;
- case 7:
- k1 ^= (long) (data[offset + tailStart + 6] & 0xff) << 48;
- case 6:
- k1 ^= (long) (data[offset + tailStart + 5] & 0xff) << 40;
- case 5:
- k1 ^= (long) (data[offset + tailStart + 4] & 0xff) << 32;
- case 4:
- k1 ^= (long) (data[offset + tailStart + 3] & 0xff) << 24;
- case 3:
- k1 ^= (long) (data[offset + tailStart + 2] & 0xff) << 16;
- case 2:
- k1 ^= (long) (data[offset + tailStart + 1] & 0xff) << 8;
- case 1:
- k1 ^= (long) (data[offset + tailStart] & 0xff);
- k1 *= C1;
- k1 = Long.rotateLeft(k1, R1);
- k1 *= C2;
- h1 ^= k1;
- }
-
- // finalization
- h1 ^= length;
- h2 ^= length;
-
- h1 += h2;
- h2 += h1;
-
- h1 = fmix64(h1);
- h2 = fmix64(h2);
-
- h1 += h2;
- h2 += h1;
-
- return new long[]{h1, h2};
- }
-
- private static long fmix64(long h) {
- h ^= (h >>> 33);
- h *= 0xff51afd7ed558ccdL;
- h ^= (h >>> 33);
- h *= 0xc4ceb9fe1a85ec53L;
- h ^= (h >>> 33);
- return h;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/common/src/test/org/apache/hive/common/util/TestMurmur3.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hive/common/util/TestMurmur3.java b/common/src/test/org/apache/hive/common/util/TestMurmur3.java
deleted file mode 100644
index 5facc7c..0000000
--- a/common/src/test/org/apache/hive/common/util/TestMurmur3.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hive.common.util;
-
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.util.Arrays;
-import java.util.Random;
-
-/**
- * Tests for Murmur3 variants.
- */
-public class TestMurmur3 {
-
- @Test
- public void testHashCodesM3_32_string() {
- String key = "test";
- int seed = 123;
- HashFunction hf = Hashing.murmur3_32(seed);
- int hc1 = hf.hashBytes(key.getBytes()).asInt();
- int hc2 = Murmur3.hash32(key.getBytes(), key.getBytes().length, seed);
- assertEquals(hc1, hc2);
-
- key = "testkey";
- hc1 = hf.hashBytes(key.getBytes()).asInt();
- hc2 = Murmur3.hash32(key.getBytes(), key.getBytes().length, seed);
- assertEquals(hc1, hc2);
- }
-
- @Test
- public void testHashCodesM3_32_ints() {
- int seed = 123;
- Random rand = new Random(seed);
- HashFunction hf = Hashing.murmur3_32(seed);
- for (int i = 0; i < 1000; i++) {
- int val = rand.nextInt();
- byte[] data = ByteBuffer.allocate(4).putInt(val).array();
- int hc1 = hf.hashBytes(data).asInt();
- int hc2 = Murmur3.hash32(data, data.length, seed);
- assertEquals(hc1, hc2);
- }
- }
-
- @Test
- public void testHashCodesM3_32_longs() {
- int seed = 123;
- Random rand = new Random(seed);
- HashFunction hf = Hashing.murmur3_32(seed);
- for (int i = 0; i < 1000; i++) {
- long val = rand.nextLong();
- byte[] data = ByteBuffer.allocate(8).putLong(val).array();
- int hc1 = hf.hashBytes(data).asInt();
- int hc2 = Murmur3.hash32(data, data.length, seed);
- assertEquals(hc1, hc2);
- }
- }
-
- @Test
- public void testHashCodesM3_32_double() {
- int seed = 123;
- Random rand = new Random(seed);
- HashFunction hf = Hashing.murmur3_32(seed);
- for (int i = 0; i < 1000; i++) {
- double val = rand.nextDouble();
- byte[] data = ByteBuffer.allocate(8).putDouble(val).array();
- int hc1 = hf.hashBytes(data).asInt();
- int hc2 = Murmur3.hash32(data, data.length, seed);
- assertEquals(hc1, hc2);
- }
- }
-
- @Test
- public void testHashCodesM3_128_string() {
- String key = "test";
- int seed = 123;
- HashFunction hf = Hashing.murmur3_128(seed);
- // guava stores the hashcodes in little endian order
- ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
- buf.put(hf.hashBytes(key.getBytes()).asBytes());
- buf.flip();
- long gl1 = buf.getLong();
- long gl2 = buf.getLong(8);
- long[] hc = Murmur3.hash128(key.getBytes(), 0, key.getBytes().length, seed);
- long m1 = hc[0];
- long m2 = hc[1];
- assertEquals(gl1, m1);
- assertEquals(gl2, m2);
-
- key = "testkey128_testkey128";
- buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
- buf.put(hf.hashBytes(key.getBytes()).asBytes());
- buf.flip();
- gl1 = buf.getLong();
- gl2 = buf.getLong(8);
- byte[] keyBytes = key.getBytes();
- hc = Murmur3.hash128(keyBytes, 0, keyBytes.length, seed);
- m1 = hc[0];
- m2 = hc[1];
- assertEquals(gl1, m1);
- assertEquals(gl2, m2);
-
- byte[] offsetKeyBytes = new byte[keyBytes.length + 35];
- Arrays.fill(offsetKeyBytes, (byte) -1);
- System.arraycopy(keyBytes, 0, offsetKeyBytes, 35, keyBytes.length);
- hc = Murmur3.hash128(offsetKeyBytes, 35, keyBytes.length, seed);
- assertEquals(gl1, hc[0]);
- assertEquals(gl2, hc[1]);
- }
-
- @Test
- public void testHashCodeM3_64() {
- byte[] origin = ("It was the best of times, it was the worst of times," +
- " it was the age of wisdom, it was the age of foolishness," +
- " it was the epoch of belief, it was the epoch of incredulity," +
- " it was the season of Light, it was the season of Darkness," +
- " it was the spring of hope, it was the winter of despair," +
- " we had everything before us, we had nothing before us," +
- " we were all going direct to Heaven," +
- " we were all going direct the other way.").getBytes();
- long hash = Murmur3.hash64(origin, 0, origin.length);
- assertEquals(305830725663368540L, hash);
-
- byte[] originOffset = new byte[origin.length + 150];
- Arrays.fill(originOffset, (byte) 123);
- System.arraycopy(origin, 0, originOffset, 150, origin.length);
- hash = Murmur3.hash64(originOffset, 150, origin.length);
- assertEquals(305830725663368540L, hash);
- }
-
- @Test
- public void testHashCodesM3_128_ints() {
- int seed = 123;
- Random rand = new Random(seed);
- HashFunction hf = Hashing.murmur3_128(seed);
- for (int i = 0; i < 1000; i++) {
- int val = rand.nextInt();
- byte[] data = ByteBuffer.allocate(4).putInt(val).array();
- // guava stores the hashcodes in little endian order
- ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
- buf.put(hf.hashBytes(data).asBytes());
- buf.flip();
- long gl1 = buf.getLong();
- long gl2 = buf.getLong(8);
- long[] hc = Murmur3.hash128(data, 0, data.length, seed);
- long m1 = hc[0];
- long m2 = hc[1];
- assertEquals(gl1, m1);
- assertEquals(gl2, m2);
-
- byte[] offsetData = new byte[data.length + 50];
- System.arraycopy(data, 0, offsetData, 50, data.length);
- hc = Murmur3.hash128(offsetData, 50, data.length, seed);
- assertEquals(gl1, hc[0]);
- assertEquals(gl2, hc[1]);
- }
- }
-
- @Test
- public void testHashCodesM3_128_longs() {
- int seed = 123;
- Random rand = new Random(seed);
- HashFunction hf = Hashing.murmur3_128(seed);
- for (int i = 0; i < 1000; i++) {
- long val = rand.nextLong();
- byte[] data = ByteBuffer.allocate(8).putLong(val).array();
- // guava stores the hashcodes in little endian order
- ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
- buf.put(hf.hashBytes(data).asBytes());
- buf.flip();
- long gl1 = buf.getLong();
- long gl2 = buf.getLong(8);
- long[] hc = Murmur3.hash128(data, 0, data.length, seed);
- long m1 = hc[0];
- long m2 = hc[1];
- assertEquals(gl1, m1);
- assertEquals(gl2, m2);
- }
- }
-
- @Test
- public void testHashCodesM3_128_double() {
- int seed = 123;
- Random rand = new Random(seed);
- HashFunction hf = Hashing.murmur3_128(seed);
- for (int i = 0; i < 1000; i++) {
- double val = rand.nextDouble();
- byte[] data = ByteBuffer.allocate(8).putDouble(val).array();
- // guava stores the hashcodes in little endian order
- ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN);
- buf.put(hf.hashBytes(data).asBytes());
- buf.flip();
- long gl1 = buf.getLong();
- long gl2 = buf.getLong(8);
- long[] hc = Murmur3.hash128(data, 0, data.length, seed);
- long m1 = hc[0];
- long m2 = hc[1];
- assertEquals(gl1, m1);
- assertEquals(gl2, m2);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/orc/src/java/org/apache/orc/BloomFilterIO.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/BloomFilterIO.java b/orc/src/java/org/apache/orc/BloomFilterIO.java
new file mode 100644
index 0000000..1406266
--- /dev/null
+++ b/orc/src/java/org/apache/orc/BloomFilterIO.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import org.apache.hive.common.util.BloomFilter;
+
+import com.google.common.primitives.Longs;
+
+public class BloomFilterIO extends BloomFilter {
+
+ public BloomFilterIO(long expectedEntries) {
+ super(expectedEntries, DEFAULT_FPP);
+ }
+
+ public BloomFilterIO(long expectedEntries, double fpp) {
+ super(expectedEntries, fpp);
+ }
+
+/**
+ * Initializes the BloomFilter from the given Orc BloomFilter
+ */
+ public BloomFilterIO(OrcProto.BloomFilter bloomFilter) {
+ this.bitSet = new BitSet(Longs.toArray(bloomFilter.getBitsetList()));
+ this.numHashFunctions = bloomFilter.getNumHashFunctions();
+ this.numBits = (int) this.bitSet.bitSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/orc/src/java/org/apache/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/OrcFile.java b/orc/src/java/org/apache/orc/OrcFile.java
index 9ea0b52..98226f9 100644
--- a/orc/src/java/org/apache/orc/OrcFile.java
+++ b/orc/src/java/org/apache/orc/OrcFile.java
@@ -23,7 +23,9 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.orc.impl.MemoryManager;
+import org.apache.orc.impl.WriterImpl;
/**
* Contains factory methods to read or write ORC files.
@@ -102,6 +104,8 @@ public class OrcFile {
ORIGINAL(0),
HIVE_8732(1), // corrupted stripe/file maximum column statistics
HIVE_4243(2), // use real column names from Hive tables
+ HIVE_12055(3), // vectorized writer
+
// Don't use any magic numbers here except for the below:
FUTURE(Integer.MAX_VALUE); // a version from a future writer
@@ -138,6 +142,7 @@ public class OrcFile {
return values[val];
}
}
+ public static final WriterVersion CURRENT_WRITER = WriterVersion.HIVE_12055;
public enum EncodingStrategy {
SPEED, COMPRESSION
@@ -511,4 +516,21 @@ public class OrcFile {
return memoryManager.get();
}
+ /**
+ * Create an ORC file writer. This is the public interface for creating
+ * writers going forward and new options will only be added to this method.
+ * @param path filename to write to
+ * @param opts the options
+ * @return a new ORC file writer
+ * @throws IOException
+ */
+ public static Writer createWriter(Path path,
+ WriterOptions opts
+ ) throws IOException {
+ FileSystem fs = opts.getFileSystem() == null ?
+ path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem();
+
+ return new WriterImpl(fs, path, opts);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/orc/src/java/org/apache/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/TypeDescription.java b/orc/src/java/org/apache/orc/TypeDescription.java
index fc945e4..f97a113 100644
--- a/orc/src/java/org/apache/orc/TypeDescription.java
+++ b/orc/src/java/org/apache/orc/TypeDescription.java
@@ -275,7 +275,7 @@ public class TypeDescription {
return maxId;
}
- private ColumnVector createColumn() {
+ private ColumnVector createColumn(int maxSize) {
switch (category) {
case BOOLEAN:
case BYTE:
@@ -298,7 +298,7 @@ public class TypeDescription {
case STRUCT: {
ColumnVector[] fieldVector = new ColumnVector[children.size()];
for(int i=0; i < fieldVector.length; ++i) {
- fieldVector[i] = children.get(i).createColumn();
+ fieldVector[i] = children.get(i).createColumn(maxSize);
}
return new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
fieldVector);
@@ -306,38 +306,42 @@ public class TypeDescription {
case UNION: {
ColumnVector[] fieldVector = new ColumnVector[children.size()];
for(int i=0; i < fieldVector.length; ++i) {
- fieldVector[i] = children.get(i).createColumn();
+ fieldVector[i] = children.get(i).createColumn(maxSize);
}
return new UnionColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
fieldVector);
}
case LIST:
return new ListColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
- children.get(0).createColumn());
+ children.get(0).createColumn(maxSize));
case MAP:
return new MapColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
- children.get(0).createColumn(), children.get(1).createColumn());
+ children.get(0).createColumn(maxSize),
+ children.get(1).createColumn(maxSize));
default:
throw new IllegalArgumentException("Unknown type " + category);
}
}
- public VectorizedRowBatch createRowBatch() {
+ public VectorizedRowBatch createRowBatch(int maxSize) {
VectorizedRowBatch result;
if (category == Category.STRUCT) {
- result = new VectorizedRowBatch(children.size(),
- VectorizedRowBatch.DEFAULT_SIZE);
+ result = new VectorizedRowBatch(children.size(), maxSize);
for(int i=0; i < result.cols.length; ++i) {
- result.cols[i] = children.get(i).createColumn();
+ result.cols[i] = children.get(i).createColumn(maxSize);
}
} else {
- result = new VectorizedRowBatch(1, VectorizedRowBatch.DEFAULT_SIZE);
- result.cols[0] = createColumn();
+ result = new VectorizedRowBatch(1, maxSize);
+ result.cols[0] = createColumn(maxSize);
}
result.reset();
return result;
}
+ public VectorizedRowBatch createRowBatch() {
+ return createRowBatch(VectorizedRowBatch.DEFAULT_SIZE);
+ }
+
/**
* Get the kind of this type.
* @return get the category for this type.
[4/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.
Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/orc/src/java/org/apache/orc/impl/WriterImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java
new file mode 100644
index 0000000..5157d4d
--- /dev/null
+++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java
@@ -0,0 +1,2912 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.orc.BinaryColumnStatistics;
+import org.apache.orc.BloomFilterIO;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.StringColumnStatistics;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+
+/**
+ * An ORC file writer. The file is divided into stripes, which is the natural
+ * unit of work when reading. Each stripe is buffered in memory until the
+ * memory reaches the stripe size and then it is written out broken down by
+ * columns. Each column is written by a TreeWriter that is specific to that
+ * type of column. TreeWriters may have children TreeWriters that handle the
+ * sub-types. Each of the TreeWriters writes the column's data as a set of
+ * streams.
+ *
+ * This class is unsynchronized like most Stream objects, so from the creation
+ * of an OrcFile and all access to a single instance has to be from a single
+ * thread.
+ *
+ * There are no known cases where these happen between different threads today.
+ *
+ * Caveat: the MemoryManager is created during WriterOptions create, that has
+ * to be confined to a single thread as well.
+ *
+ */
+public class WriterImpl implements Writer, MemoryManager.Callback {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
+
+ private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+ private static final int MIN_ROW_INDEX_STRIDE = 1000;
+
+ // threshold above which buffer size will be automatically resized
+ private static final int COLUMN_COUNT_THRESHOLD = 1000;
+
+ private final FileSystem fs;
+ private final Path path;
+ private final long defaultStripeSize;
+ private long adjustedStripeSize;
+ private final int rowIndexStride;
+ private final CompressionKind compress;
+ private final CompressionCodec codec;
+ private final boolean addBlockPadding;
+ private final int bufferSize;
+ private final long blockSize;
+ private final double paddingTolerance;
+ private final TypeDescription schema;
+
+ // the streams that make up the current stripe
+ private final Map<StreamName, BufferedStream> streams =
+ new TreeMap<StreamName, BufferedStream>();
+
+ private FSDataOutputStream rawWriter = null;
+ // the compressed metadata information outStream
+ private OutStream writer = null;
+ // a protobuf outStream around streamFactory
+ private CodedOutputStream protobufWriter = null;
+ private long headerLength;
+ private int columnCount;
+ private long rowCount = 0;
+ private long rowsInStripe = 0;
+ private long rawDataSize = 0;
+ private int rowsInIndex = 0;
+ private int stripesAtLastFlush = -1;
+ private final List<OrcProto.StripeInformation> stripes =
+ new ArrayList<OrcProto.StripeInformation>();
+ private final Map<String, ByteString> userMetadata =
+ new TreeMap<String, ByteString>();
+ private final StreamFactory streamFactory = new StreamFactory();
+ private final TreeWriter treeWriter;
+ private final boolean buildIndex;
+ private final MemoryManager memoryManager;
+ private final OrcFile.Version version;
+ private final Configuration conf;
+ private final OrcFile.WriterCallback callback;
+ private final OrcFile.WriterContext callbackContext;
+ private final OrcFile.EncodingStrategy encodingStrategy;
+ private final OrcFile.CompressionStrategy compressionStrategy;
+ private final boolean[] bloomFilterColumns;
+ private final double bloomFilterFpp;
+ private boolean writeTimeZone;
+
+ public WriterImpl(FileSystem fs,
+ Path path,
+ OrcFile.WriterOptions opts) throws IOException {
+ this.fs = fs;
+ this.path = path;
+ this.conf = opts.getConfiguration();
+ this.callback = opts.getCallback();
+ this.schema = opts.getSchema();
+ if (callback != null) {
+ callbackContext = new OrcFile.WriterContext(){
+
+ @Override
+ public Writer getWriter() {
+ return WriterImpl.this;
+ }
+ };
+ } else {
+ callbackContext = null;
+ }
+ this.adjustedStripeSize = opts.getStripeSize();
+ this.defaultStripeSize = opts.getStripeSize();
+ this.version = opts.getVersion();
+ this.encodingStrategy = opts.getEncodingStrategy();
+ this.compressionStrategy = opts.getCompressionStrategy();
+ this.addBlockPadding = opts.getBlockPadding();
+ this.blockSize = opts.getBlockSize();
+ this.paddingTolerance = opts.getPaddingTolerance();
+ this.compress = opts.getCompress();
+ this.rowIndexStride = opts.getRowIndexStride();
+ this.memoryManager = opts.getMemoryManager();
+ buildIndex = rowIndexStride > 0;
+ codec = createCodec(compress);
+ int numColumns = schema.getMaximumId() + 1;
+ this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
+ numColumns, opts.getBufferSize());
+ if (version == OrcFile.Version.V_0_11) {
+ /* do not write bloom filters for ORC v11 */
+ this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
+ } else {
+ this.bloomFilterColumns =
+ OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema);
+ }
+ this.bloomFilterFpp = opts.getBloomFilterFpp();
+ treeWriter = createTreeWriter(schema, streamFactory, false);
+ if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
+ throw new IllegalArgumentException("Row stride must be at least " +
+ MIN_ROW_INDEX_STRIDE);
+ }
+
+ // ensure that we are able to handle callbacks before we register ourselves
+ memoryManager.addWriter(path, opts.getStripeSize(), this);
+ }
+
+ @VisibleForTesting
+ public static int getEstimatedBufferSize(long stripeSize, int numColumns,
+ int bs) {
+ // The worst case is that there are 2 big streams per a column and
+ // we want to guarantee that each stream gets ~10 buffers.
+ // This keeps buffers small enough that we don't get really small stripe
+ // sizes.
+ int estBufferSize = (int) (stripeSize / (20 * numColumns));
+ estBufferSize = getClosestBufferSize(estBufferSize);
+ if (estBufferSize > bs) {
+ estBufferSize = bs;
+ } else {
+ LOG.info("WIDE TABLE - Number of columns: " + numColumns +
+ " Chosen compression buffer size: " + estBufferSize);
+ }
+ return estBufferSize;
+ }
+
+ private static int getClosestBufferSize(int estBufferSize) {
+ final int kb4 = 4 * 1024;
+ final int kb8 = 8 * 1024;
+ final int kb16 = 16 * 1024;
+ final int kb32 = 32 * 1024;
+ final int kb64 = 64 * 1024;
+ final int kb128 = 128 * 1024;
+ final int kb256 = 256 * 1024;
+ if (estBufferSize <= kb4) {
+ return kb4;
+ } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
+ return kb8;
+ } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
+ return kb16;
+ } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
+ return kb32;
+ } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
+ return kb64;
+ } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
+ return kb128;
+ } else {
+ return kb256;
+ }
+ }
+
+ public static CompressionCodec createCodec(CompressionKind kind) {
+ switch (kind) {
+ case NONE:
+ return null;
+ case ZLIB:
+ return new ZlibCodec();
+ case SNAPPY:
+ return new SnappyCodec();
+ case LZO:
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ if (loader == null) {
+ loader = WriterImpl.class.getClassLoader();
+ }
+ @SuppressWarnings("unchecked")
+ Class<? extends CompressionCodec> lzo =
+ (Class<? extends CompressionCodec>)
+ loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
+ return lzo.newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("LZO is not available.", e);
+ } catch (InstantiationException e) {
+ throw new IllegalArgumentException("Problem initializing LZO", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalArgumentException("Insufficient access to LZO", e);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown compression codec: " +
+ kind);
+ }
+ }
+
+ @Override
+ public boolean checkMemory(double newScale) throws IOException {
+ long limit = (long) Math.round(adjustedStripeSize * newScale);
+ long size = estimateStripeSize();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
+ limit);
+ }
+ if (size > limit) {
+ flushStripe();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This class is used to hold the contents of streams as they are buffered.
+ * The TreeWriters write to the outStream and the codec compresses the
+ * data as buffers fill up and stores them in the output list. When the
+ * stripe is being written, the whole stream is written to the file.
+ */
+ private class BufferedStream implements OutStream.OutputReceiver {
+ private final OutStream outStream;
+ private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
+
+ BufferedStream(String name, int bufferSize,
+ CompressionCodec codec) throws IOException {
+ outStream = new OutStream(name, bufferSize, codec, this);
+ }
+
+ /**
+ * Receive a buffer from the compression codec.
+ * @param buffer the buffer to save
+ */
+ @Override
+ public void output(ByteBuffer buffer) {
+ output.add(buffer);
+ }
+
+ /**
+ * Get the number of bytes in buffers that are allocated to this stream.
+ * @return number of bytes in buffers
+ */
+ public long getBufferSize() {
+ long result = 0;
+ for(ByteBuffer buf: output) {
+ result += buf.capacity();
+ }
+ return outStream.getBufferSize() + result;
+ }
+
+ /**
+ * Flush the stream to the codec.
+ * @throws IOException
+ */
+ public void flush() throws IOException {
+ outStream.flush();
+ }
+
+ /**
+ * Clear all of the buffers.
+ * @throws IOException
+ */
+ public void clear() throws IOException {
+ outStream.clear();
+ output.clear();
+ }
+
+ /**
+ * Check the state of suppress flag in output stream
+ * @return value of suppress flag
+ */
+ public boolean isSuppressed() {
+ return outStream.isSuppressed();
+ }
+
+ /**
+ * Get the number of bytes that will be written to the output. Assumes
+ * the stream has already been flushed.
+ * @return the number of bytes
+ */
+ public long getOutputSize() {
+ long result = 0;
+ for(ByteBuffer buffer: output) {
+ result += buffer.remaining();
+ }
+ return result;
+ }
+
+ /**
+ * Write the saved compressed buffers to the OutputStream.
+ * @param out the stream to write to
+ * @throws IOException
+ */
+ void spillTo(OutputStream out) throws IOException {
+ for(ByteBuffer buffer: output) {
+ out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return outStream.toString();
+ }
+ }
+
+ /**
+ * An output receiver that writes the ByteBuffers to the output stream
+ * as they are received.
+ */
+ private class DirectStream implements OutStream.OutputReceiver {
+ private final FSDataOutputStream output;
+
+ DirectStream(FSDataOutputStream output) {
+ this.output = output;
+ }
+
+ @Override
+ public void output(ByteBuffer buffer) throws IOException {
+ output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
+ buffer.remaining());
+ }
+ }
+
+ private static class RowIndexPositionRecorder implements PositionRecorder {
+ private final OrcProto.RowIndexEntry.Builder builder;
+
+ RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
+ this.builder = builder;
+ }
+
+ @Override
+ public void addPosition(long position) {
+ builder.addPositions(position);
+ }
+ }
+
+ /**
+ * Interface from the Writer to the TreeWriters. This limits the visibility
+ * that the TreeWriters have into the Writer.
+ */
+ private class StreamFactory {
+ /**
+ * Create a stream to store part of a column.
+ * @param column the column id for the stream
+ * @param kind the kind of stream
+ * @return The output outStream that the section needs to be written to.
+ * @throws IOException
+ */
+ public OutStream createStream(int column,
+ OrcProto.Stream.Kind kind
+ ) throws IOException {
+ final StreamName name = new StreamName(column, kind);
+ final EnumSet<CompressionCodec.Modifier> modifiers;
+
+ switch (kind) {
+ case BLOOM_FILTER:
+ case DATA:
+ case DICTIONARY_DATA:
+ if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
+ modifiers = EnumSet.of(CompressionCodec.Modifier.FAST,
+ CompressionCodec.Modifier.TEXT);
+ } else {
+ modifiers = EnumSet.of(CompressionCodec.Modifier.DEFAULT,
+ CompressionCodec.Modifier.TEXT);
+ }
+ break;
+ case LENGTH:
+ case DICTIONARY_COUNT:
+ case PRESENT:
+ case ROW_INDEX:
+ case SECONDARY:
+ // easily compressed using the fastest modes
+ modifiers = EnumSet.of(CompressionCodec.Modifier.FASTEST,
+ CompressionCodec.Modifier.BINARY);
+ break;
+ default:
+ LOG.warn("Missing ORC compression modifiers for " + kind);
+ modifiers = null;
+ break;
+ }
+
+ BufferedStream result = streams.get(name);
+ if (result == null) {
+ result = new BufferedStream(name.toString(), bufferSize,
+ codec == null ? codec : codec.modify(modifiers));
+ streams.put(name, result);
+ }
+ return result.outStream;
+ }
+
+ /**
+ * Get the next column id.
+ * @return a number from 0 to the number of columns - 1
+ */
+ public int getNextColumnId() {
+ return columnCount++;
+ }
+
+ /**
+ * Get the stride rate of the row index.
+ */
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ /**
+ * Should be building the row index.
+ * @return true if we are building the index
+ */
+ public boolean buildIndex() {
+ return buildIndex;
+ }
+
+ /**
+ * Is the ORC file compressed?
+ * @return are the streams compressed
+ */
+ public boolean isCompressed() {
+ return codec != null;
+ }
+
+ /**
+ * Get the encoding strategy to use.
+ * @return encoding strategy
+ */
+ public OrcFile.EncodingStrategy getEncodingStrategy() {
+ return encodingStrategy;
+ }
+
+ /**
+ * Get the compression strategy to use.
+ * @return compression strategy
+ */
+ public OrcFile.CompressionStrategy getCompressionStrategy() {
+ return compressionStrategy;
+ }
+
+ /**
+ * Get the bloom filter columns
+ * @return bloom filter columns
+ */
+ public boolean[] getBloomFilterColumns() {
+ return bloomFilterColumns;
+ }
+
+ /**
+ * Get bloom filter false positive percentage.
+ * @return fpp
+ */
+ public double getBloomFilterFPP() {
+ return bloomFilterFpp;
+ }
+
+ /**
+ * Get the writer's configuration.
+ * @return configuration
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ /**
+ * Get the version of the file to write.
+ */
+ public OrcFile.Version getVersion() {
+ return version;
+ }
+
+ public void useWriterTimeZone(boolean val) {
+ writeTimeZone = val;
+ }
+
+ public boolean hasWriterTimeZone() {
+ return writeTimeZone;
+ }
+ }
+
+ /**
+ * The parent class of all of the writers for each column. Each column
+ * is written by an instance of this class. The compound types (struct,
+ * list, map, and union) have children tree writers that write the children
+ * types.
+ */
+ private abstract static class TreeWriter {
+ protected final int id;
+ protected final BitFieldWriter isPresent;
+ private final boolean isCompressed;
+ protected final ColumnStatisticsImpl indexStatistics;
+ protected final ColumnStatisticsImpl stripeColStatistics;
+ private final ColumnStatisticsImpl fileStatistics;
+ protected TreeWriter[] childrenWriters;
+ protected final RowIndexPositionRecorder rowIndexPosition;
+ private final OrcProto.RowIndex.Builder rowIndex;
+ private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
+ private final PositionedOutputStream rowIndexStream;
+ private final PositionedOutputStream bloomFilterStream;
+ protected final BloomFilterIO bloomFilter;
+ protected final boolean createBloomFilter;
+ private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
+ private final OrcProto.BloomFilter.Builder bloomFilterEntry;
+ private boolean foundNulls;
+ private OutStream isPresentOutStream;
+ private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders;
+ private final StreamFactory streamFactory;
+
+ /**
+ * Create a tree writer.
+ * @param columnId the column id of the column to write
+ * @param schema the row schema
+ * @param streamFactory limited access to the Writer's data.
+ * @param nullable can the value be null?
+ * @throws IOException
+ */
+ TreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory streamFactory,
+ boolean nullable) throws IOException {
+ this.streamFactory = streamFactory;
+ this.isCompressed = streamFactory.isCompressed();
+ this.id = columnId;
+ if (nullable) {
+ isPresentOutStream = streamFactory.createStream(id,
+ OrcProto.Stream.Kind.PRESENT);
+ isPresent = new BitFieldWriter(isPresentOutStream, 1);
+ } else {
+ isPresent = null;
+ }
+ this.foundNulls = false;
+ createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
+ indexStatistics = ColumnStatisticsImpl.create(schema);
+ stripeColStatistics = ColumnStatisticsImpl.create(schema);
+ fileStatistics = ColumnStatisticsImpl.create(schema);
+ childrenWriters = new TreeWriter[0];
+ rowIndex = OrcProto.RowIndex.newBuilder();
+ rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
+ rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
+ stripeStatsBuilders = Lists.newArrayList();
+ if (streamFactory.buildIndex()) {
+ rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
+ } else {
+ rowIndexStream = null;
+ }
+ if (createBloomFilter) {
+ bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
+ bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
+ bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
+ bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
+ streamFactory.getBloomFilterFPP());
+ } else {
+ bloomFilterEntry = null;
+ bloomFilterIndex = null;
+ bloomFilterStream = null;
+ bloomFilter = null;
+ }
+ }
+
+ protected OrcProto.RowIndex.Builder getRowIndex() {
+ return rowIndex;
+ }
+
+ protected ColumnStatisticsImpl getStripeStatistics() {
+ return stripeColStatistics;
+ }
+
+ protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
+ return rowIndexEntry;
+ }
+
+ IntegerWriter createIntegerWriter(PositionedOutputStream output,
+ boolean signed, boolean isDirectV2,
+ StreamFactory writer) {
+ if (isDirectV2) {
+ boolean alignedBitpacking = false;
+ if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
+ alignedBitpacking = true;
+ }
+ return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
+ } else {
+ return new RunLengthIntegerWriter(output, signed);
+ }
+ }
+
+ boolean isNewWriteFormat(StreamFactory writer) {
+ return writer.getVersion() != OrcFile.Version.V_0_11;
+ }
+
+ /**
+ * Handle the top level object write.
+ *
+ * This default method is used for all types except structs, which are the
+ * typical case. VectorizedRowBatch assumes the top level object is a
+ * struct, so we use the first column for all other types.
+ * @param batch the batch to write from
+ * @param offset the row to start on
+ * @param length the number of rows to write
+ * @throws IOException
+ */
+ void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException {
+ writeBatch(batch.cols[0], offset, length);
+ }
+
+ /**
+ * Write the values from the given vector from offset for length elements.
+ * @param vector the vector to write from
+ * @param offset the first value from the vector to write
+ * @param length the number of values from the vector to write
+ * @throws IOException
+ */
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ if (vector.noNulls) {
+ indexStatistics.increment(length);
+ if (isPresent != null) {
+ for (int i = 0; i < length; ++i) {
+ isPresent.write(1);
+ }
+ }
+ } else {
+ if (vector.isRepeating) {
+ boolean isNull = vector.isNull[0];
+ if (isPresent != null) {
+ for (int i = 0; i < length; ++i) {
+ isPresent.write(isNull ? 0 : 1);
+ }
+ }
+ if (isNull) {
+ foundNulls = true;
+ indexStatistics.setNull();
+ } else {
+ indexStatistics.increment(length);
+ }
+ } else {
+ // count the number of non-null values
+ int nonNullCount = 0;
+ for(int i = 0; i < length; ++i) {
+ boolean isNull = vector.isNull[i + offset];
+ if (!isNull) {
+ nonNullCount += 1;
+ }
+ if (isPresent != null) {
+ isPresent.write(isNull ? 0 : 1);
+ }
+ }
+ indexStatistics.increment(nonNullCount);
+ if (nonNullCount != length) {
+ foundNulls = true;
+ indexStatistics.setNull();
+ }
+ }
+ }
+ }
+
+ private void removeIsPresentPositions() {
+ for(int i=0; i < rowIndex.getEntryCount(); ++i) {
+ OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
+ List<Long> positions = entry.getPositionsList();
+ // bit streams use 3 positions if uncompressed, 4 if compressed
+ positions = positions.subList(isCompressed ? 4 : 3, positions.size());
+ entry.clearPositions();
+ entry.addAllPositions(positions);
+ }
+ }
+
+ /**
+ * Write the stripe out to the file.
+ * @param builder the stripe footer that contains the information about the
+ * layout of the stripe. The TreeWriter is required to update
+ * the footer with its information.
+ * @param requiredIndexEntries the number of index entries that are
+ * required. this is to check to make sure the
+ * row index is well formed.
+ * @throws IOException
+ */
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ if (isPresent != null) {
+ isPresent.flush();
+
+ // if no nulls are found in a stream, then suppress the stream
+ if(!foundNulls) {
+ isPresentOutStream.suppress();
+ // since isPresent bitstream is suppressed, update the index to
+ // remove the positions of the isPresent stream
+ if (rowIndexStream != null) {
+ removeIsPresentPositions();
+ }
+ }
+ }
+
+ // merge stripe-level column statistics to file statistics and write it to
+ // stripe statistics
+ OrcProto.StripeStatistics.Builder stripeStatsBuilder = OrcProto.StripeStatistics.newBuilder();
+ writeStripeStatistics(stripeStatsBuilder, this);
+ stripeStatsBuilders.add(stripeStatsBuilder);
+
+ // reset the flag for next stripe
+ foundNulls = false;
+
+ builder.addColumns(getEncoding());
+ if (streamFactory.hasWriterTimeZone()) {
+ builder.setWriterTimezone(TimeZone.getDefault().getID());
+ }
+ if (rowIndexStream != null) {
+ if (rowIndex.getEntryCount() != requiredIndexEntries) {
+ throw new IllegalArgumentException("Column has wrong number of " +
+ "index entries found: " + rowIndex.getEntryCount() + " expected: " +
+ requiredIndexEntries);
+ }
+ rowIndex.build().writeTo(rowIndexStream);
+ rowIndexStream.flush();
+ }
+ rowIndex.clear();
+ rowIndexEntry.clear();
+
+ // write the bloom filter to out stream
+ if (bloomFilterStream != null) {
+ bloomFilterIndex.build().writeTo(bloomFilterStream);
+ bloomFilterStream.flush();
+ bloomFilterIndex.clear();
+ bloomFilterEntry.clear();
+ }
+ }
+
+ private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
+ TreeWriter treeWriter) {
+ treeWriter.fileStatistics.merge(treeWriter.stripeColStatistics);
+ builder.addColStats(treeWriter.stripeColStatistics.serialize().build());
+ treeWriter.stripeColStatistics.reset();
+ for (TreeWriter child : treeWriter.getChildrenWriters()) {
+ writeStripeStatistics(builder, child);
+ }
+ }
+
+ TreeWriter[] getChildrenWriters() {
+ return childrenWriters;
+ }
+
+ /**
+ * Get the encoding for this column.
+ * @return the information about the encoding of this column
+ */
+ OrcProto.ColumnEncoding getEncoding() {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ /**
+ * Create a row index entry with the previous location and the current
+ * index statistics. Also merges the index statistics into the file
+ * statistics before they are cleared. Finally, it records the start of the
+ * next index and ensures all of the children columns also create an entry.
+ * @throws IOException
+ */
+ void createRowIndexEntry() throws IOException {
+ stripeColStatistics.merge(indexStatistics);
+ rowIndexEntry.setStatistics(indexStatistics.serialize());
+ indexStatistics.reset();
+ rowIndex.addEntry(rowIndexEntry);
+ rowIndexEntry.clear();
+ addBloomFilterEntry();
+ recordPosition(rowIndexPosition);
+ for(TreeWriter child: childrenWriters) {
+ child.createRowIndexEntry();
+ }
+ }
+
+ void addBloomFilterEntry() {
+ if (createBloomFilter) {
+ bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
+ bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet()));
+ bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
+ bloomFilter.reset();
+ bloomFilterEntry.clear();
+ }
+ }
+
+ /**
+ * Record the current position in each of this column's streams.
+ * @param recorder where should the locations be recorded
+ * @throws IOException
+ */
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ if (isPresent != null) {
+ isPresent.getPosition(recorder);
+ }
+ }
+
+ /**
+ * Estimate how much memory the writer is consuming excluding the streams.
+ * @return the number of bytes.
+ */
+ long estimateMemory() {
+ long result = 0;
+ for (TreeWriter child: childrenWriters) {
+ result += child.estimateMemory();
+ }
+ return result;
+ }
+ }
+
+ private static class BooleanTreeWriter extends TreeWriter {
+ private final BitFieldWriter writer;
+
+ BooleanTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ PositionedOutputStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.writer = new BitFieldWriter(out, 1);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int value = vec.vector[0] == 0 ? 0 : 1;
+ indexStatistics.updateBoolean(value != 0, length);
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int value = vec.vector[i + offset] == 0 ? 0 : 1;
+ writer.write(value);
+ indexStatistics.updateBoolean(value != 0, 1);
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class ByteTreeWriter extends TreeWriter {
+ private final RunLengthByteWriter writer;
+
+ ByteTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.writer = new RunLengthByteWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA));
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte value = (byte) vec.vector[0];
+ indexStatistics.updateInteger(value, length);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ byte value = (byte) vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateInteger(value, 1);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class IntegerTreeWriter extends TreeWriter {
+ private final IntegerWriter writer;
+ private boolean isDirectV2 = true;
+
+ IntegerTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ OutStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ long value = vec.vector[0];
+ indexStatistics.updateInteger(value, length);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ long value = vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateInteger(value, 1);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
+ private static class FloatTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+ private final SerializationUtils utils;
+
+ FloatTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.utils = new SerializationUtils();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DoubleColumnVector vec = (DoubleColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ float value = (float) vec.vector[0];
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ for(int i=0; i < length; ++i) {
+ utils.writeFloat(stream, value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ float value = (float) vec.vector[i + offset];
+ utils.writeFloat(stream, value);
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ }
+ }
+ }
+ }
+
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ }
+ }
+
+ private static class DoubleTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+ private final SerializationUtils utils;
+
+ DoubleTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.utils = new SerializationUtils();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DoubleColumnVector vec = (DoubleColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ double value = vec.vector[0];
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ for(int i=0; i < length; ++i) {
+ utils.writeDouble(stream, value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ double value = vec.vector[i + offset];
+ utils.writeDouble(stream, value);
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ }
+ }
+
+ private static abstract class StringBaseTreeWriter extends TreeWriter {
+ private static final int INITIAL_DICTIONARY_SIZE = 4096;
+ private final OutStream stringOutput;
+ private final IntegerWriter lengthOutput;
+ private final IntegerWriter rowOutput;
+ protected final StringRedBlackTree dictionary =
+ new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
+ protected final DynamicIntArray rows = new DynamicIntArray();
+ protected final PositionedOutputStream directStreamOutput;
+ protected final IntegerWriter directLengthOutput;
+ private final List<OrcProto.RowIndexEntry> savedRowIndex =
+ new ArrayList<OrcProto.RowIndexEntry>();
+ private final boolean buildIndex;
+ private final List<Long> rowIndexValueCount = new ArrayList<Long>();
+ // If the number of keys in a dictionary is greater than this fraction of
+ //the total number of non-null rows, turn off dictionary encoding
+ private final double dictionaryKeySizeThreshold;
+ protected boolean useDictionaryEncoding = true;
+ private boolean isDirectV2 = true;
+ private boolean doneDictionaryCheck;
+ private final boolean strideDictionaryCheck;
+
+ StringBaseTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ stringOutput = writer.createStream(id,
+ OrcProto.Stream.Kind.DICTIONARY_DATA);
+ lengthOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ rowOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(0L);
+ buildIndex = writer.buildIndex();
+ directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+ directLengthOutput = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ Configuration conf = writer.getConfiguration();
+ dictionaryKeySizeThreshold =
+ OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
+ strideDictionaryCheck =
+ OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
+ doneDictionaryCheck = false;
+ }
+
+ private boolean checkDictionaryEncoding() {
+ if (!doneDictionaryCheck) {
+ // Set the flag indicating whether or not to use dictionary encoding
+ // based on whether or not the fraction of distinct keys over number of
+ // non-null rows is less than the configured threshold
+ float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
+ useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
+ doneDictionaryCheck = true;
+ }
+ return useDictionaryEncoding;
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
+ // checking would not have happened. So do it again here.
+ checkDictionaryEncoding();
+
+ if (useDictionaryEncoding) {
+ flushDictionary();
+ } else {
+ // flushout any left over entries from dictionary
+ if (rows.size() > 0) {
+ flushDictionary();
+ }
+
+ // suppress the stream for every stripe if dictionary is disabled
+ stringOutput.suppress();
+ }
+
+ // we need to build the rowindex before calling super, since it
+ // writes it out.
+ super.writeStripe(builder, requiredIndexEntries);
+ stringOutput.flush();
+ lengthOutput.flush();
+ rowOutput.flush();
+ directStreamOutput.flush();
+ directLengthOutput.flush();
+ // reset all of the fields to be ready for the next stripe.
+ dictionary.clear();
+ savedRowIndex.clear();
+ rowIndexValueCount.clear();
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(0L);
+
+ if (!useDictionaryEncoding) {
+ // record the start positions of first index stride of next stripe i.e
+ // beginning of the direct streams when dictionary is disabled
+ recordDirectStreamPosition();
+ }
+ }
+
+ private void flushDictionary() throws IOException {
+ final int[] dumpOrder = new int[dictionary.size()];
+
+ if (useDictionaryEncoding) {
+ // Write the dictionary by traversing the red-black tree writing out
+ // the bytes and lengths; and creating the map from the original order
+ // to the final sorted order.
+
+ dictionary.visit(new StringRedBlackTree.Visitor() {
+ private int currentId = 0;
+ @Override
+ public void visit(StringRedBlackTree.VisitorContext context
+ ) throws IOException {
+ context.writeBytes(stringOutput);
+ lengthOutput.write(context.getLength());
+ dumpOrder[context.getOriginalPosition()] = currentId++;
+ }
+ });
+ } else {
+ // for direct encoding, we don't want the dictionary data stream
+ stringOutput.suppress();
+ }
+ int length = rows.size();
+ int rowIndexEntry = 0;
+ OrcProto.RowIndex.Builder rowIndex = getRowIndex();
+ Text text = new Text();
+ // write the values translated into the dump order.
+ for(int i = 0; i <= length; ++i) {
+ // now that we are writing out the row values, we can finalize the
+ // row index
+ if (buildIndex) {
+ while (i == rowIndexValueCount.get(rowIndexEntry) &&
+ rowIndexEntry < savedRowIndex.size()) {
+ OrcProto.RowIndexEntry.Builder base =
+ savedRowIndex.get(rowIndexEntry++).toBuilder();
+ if (useDictionaryEncoding) {
+ rowOutput.getPosition(new RowIndexPositionRecorder(base));
+ } else {
+ PositionRecorder posn = new RowIndexPositionRecorder(base);
+ directStreamOutput.getPosition(posn);
+ directLengthOutput.getPosition(posn);
+ }
+ rowIndex.addEntry(base.build());
+ }
+ }
+ if (i != length) {
+ if (useDictionaryEncoding) {
+ rowOutput.write(dumpOrder[rows.get(i)]);
+ } else {
+ dictionary.getText(text, rows.get(i));
+ directStreamOutput.write(text.getBytes(), 0, text.getLength());
+ directLengthOutput.write(text.getLength());
+ }
+ }
+ }
+ rows.clear();
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ // Returns the encoding used for the last call to writeStripe
+ if (useDictionaryEncoding) {
+ if(isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DICTIONARY_V2).
+ setDictionarySize(dictionary.size()).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DICTIONARY).
+ setDictionarySize(dictionary.size()).build();
+ } else {
+ if(isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder().setKind(
+ OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+ }
+
+ /**
+ * This method doesn't call the super method, because unlike most of the
+ * other TreeWriters, this one can't record the position in the streams
+ * until the stripe is being flushed. Therefore it saves all of the entries
+ * and augments them with the final information as the stripe is written.
+ * @throws IOException
+ */
+ @Override
+ void createRowIndexEntry() throws IOException {
+ getStripeStatistics().merge(indexStatistics);
+ OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
+ rowIndexEntry.setStatistics(indexStatistics.serialize());
+ indexStatistics.reset();
+ OrcProto.RowIndexEntry base = rowIndexEntry.build();
+ savedRowIndex.add(base);
+ rowIndexEntry.clear();
+ addBloomFilterEntry();
+ recordPosition(rowIndexPosition);
+ rowIndexValueCount.add(Long.valueOf(rows.size()));
+ if (strideDictionaryCheck) {
+ checkDictionaryEncoding();
+ }
+ if (!useDictionaryEncoding) {
+ if (rows.size() > 0) {
+ flushDictionary();
+ // just record the start positions of next index stride
+ recordDirectStreamPosition();
+ } else {
+ // record the start positions of next index stride
+ recordDirectStreamPosition();
+ getRowIndex().addEntry(base);
+ }
+ }
+ }
+
+ private void recordDirectStreamPosition() throws IOException {
+ directStreamOutput.getPosition(rowIndexPosition);
+ directLengthOutput.getPosition(rowIndexPosition);
+ }
+
+ @Override
+ long estimateMemory() {
+ return rows.getSizeInBytes() + dictionary.getSizeInBytes();
+ }
+ }
+
+ private static class StringTreeWriter extends StringBaseTreeWriter {
+ StringTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
+ for(int i=0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ directStreamOutput.write(vec.vector[0], vec.start[0],
+ vec.length[0]);
+ directLengthOutput.write(vec.length[0]);
+ }
+ }
+ indexStatistics.updateString(vec.vector[0], vec.start[0],
+ vec.length[0], length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]));
+ } else {
+ directStreamOutput.write(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ directLengthOutput.write(vec.length[offset + i]);
+ }
+ indexStatistics.updateString(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i], 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Under the covers, char is written to ORC the same way as string.
+ */
+ private static class CharTreeWriter extends StringBaseTreeWriter {
+ private final int itemLength;
+ private final byte[] padding;
+
+ CharTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ itemLength = schema.getMaxLength();
+ padding = new byte[itemLength];
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte[] ptr;
+ int ptrOffset;
+ if (vec.length[0] >= itemLength) {
+ ptr = vec.vector[0];
+ ptrOffset = vec.start[0];
+ } else {
+ ptr = padding;
+ ptrOffset = 0;
+ System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
+ vec.length[0]);
+ Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
+ }
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(ptr, ptrOffset, itemLength);
+ for(int i=0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ directStreamOutput.write(ptr, ptrOffset, itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ }
+ indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ byte[] ptr;
+ int ptrOffset;
+ if (vec.length[offset + i] >= itemLength) {
+ ptr = vec.vector[offset + i];
+ ptrOffset = vec.start[offset + i];
+ } else {
+ // it is the wrong length, so copy it
+ ptr = padding;
+ ptrOffset = 0;
+ System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
+ ptr, 0, vec.length[offset + i]);
+ Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
+ }
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(ptr, ptrOffset, itemLength));
+ } else {
+ directStreamOutput.write(ptr, ptrOffset, itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Under the covers, varchar is written to ORC the same way as string.
+ */
+ private static class VarcharTreeWriter extends StringBaseTreeWriter {
+ private final int maxLength;
+
+ VarcharTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ maxLength = schema.getMaxLength();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int itemLength = Math.min(vec.length[0], maxLength);
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
+ for(int i=0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ directStreamOutput.write(vec.vector[0], vec.start[0],
+ itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ }
+ indexStatistics.updateString(vec.vector[0], vec.start[0],
+ itemLength, length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int itemLength = Math.min(vec.length[offset + i], maxLength);
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(vec.vector[offset + i],
+ vec.start[offset + i], itemLength));
+ } else {
+ directStreamOutput.write(vec.vector[offset + i],
+ vec.start[offset + i], itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ indexStatistics.updateString(vec.vector[offset + i],
+ vec.start[offset + i], itemLength, 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], itemLength);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private static class BinaryTreeWriter extends TreeWriter {
+ private final PositionedOutputStream stream;
+ private final IntegerWriter length;
+ private boolean isDirectV2 = true;
+
+ BinaryTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.stream = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.length = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ for(int i=0; i < length; ++i) {
+ stream.write(vec.vector[0], vec.start[0],
+ vec.length[0]);
+ this.length.write(vec.length[0]);
+ }
+ indexStatistics.updateBinary(vec.vector[0], vec.start[0],
+ vec.length[0], length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ stream.write(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ this.length.write(vec.length[offset + i]);
+ indexStatistics.updateBinary(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i], 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ }
+ }
+ }
+ }
+ }
+
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ stream.flush();
+ length.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ stream.getPosition(recorder);
+ length.getPosition(recorder);
+ }
+ }
+
+ public static final int MILLIS_PER_SECOND = 1000;
+ static final int NANOS_PER_SECOND = 1000000000;
+ static final int MILLIS_PER_NANO = 1000000;
+ public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
+
+ private static class TimestampTreeWriter extends TreeWriter {
+ private final IntegerWriter seconds;
+ private final IntegerWriter nanos;
+ private final boolean isDirectV2;
+ private final long base_timestamp;
+
+ TimestampTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.seconds = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
+ this.nanos = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ // for unit tests to set different time zones
+ this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
+ writer.useWriterTimeZone(true);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ long value = vec.vector[0];
+ long valueMillis = value / MILLIS_PER_NANO;
+ indexStatistics.updateTimestamp(valueMillis);
+ if (createBloomFilter) {
+ bloomFilter.addLong(valueMillis);
+ }
+ final long secs = value / NANOS_PER_SECOND - base_timestamp;
+ final long nano = formatNanos((int) (value % NANOS_PER_SECOND));
+ for(int i=0; i < length; ++i) {
+ seconds.write(secs);
+ nanos.write(nano);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ long value = vec.vector[i + offset];
+ long valueMillis = value / MILLIS_PER_NANO;
+ long valueSecs = value /NANOS_PER_SECOND - base_timestamp;
+ int valueNanos = (int) (value % NANOS_PER_SECOND);
+ if (valueNanos < 0) {
+ valueNanos += NANOS_PER_SECOND;
+ }
+ seconds.write(valueSecs);
+ nanos.write(formatNanos(valueNanos));
+ indexStatistics.updateTimestamp(valueMillis);
+ if (createBloomFilter) {
+ bloomFilter.addLong(valueMillis);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ seconds.flush();
+ nanos.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ private static long formatNanos(int nanos) {
+ if (nanos == 0) {
+ return 0;
+ } else if (nanos % 100 != 0) {
+ return ((long) nanos) << 3;
+ } else {
+ nanos /= 100;
+ int trailingZeros = 1;
+ while (nanos % 10 == 0 && trailingZeros < 7) {
+ nanos /= 10;
+ trailingZeros += 1;
+ }
+ return ((long) nanos) << 3 | trailingZeros;
+ }
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ seconds.getPosition(recorder);
+ nanos.getPosition(recorder);
+ }
+ }
+
+ private static class DateTreeWriter extends TreeWriter {
+ private final IntegerWriter writer;
+ private final boolean isDirectV2;
+
+ DateTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ OutStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ this.writer = createIntegerWriter(out, true, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int value = (int) vec.vector[0];
+ indexStatistics.updateDate(value);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int value = (int) vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateDate(value);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+ }
+
+ private static class DecimalTreeWriter extends TreeWriter {
+ private final PositionedOutputStream valueStream;
+ private final IntegerWriter scaleStream;
+ private final boolean isDirectV2;
+
+ DecimalTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
+ this.scaleStream = createIntegerWriter(writer.createStream(id,
+ OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DecimalColumnVector vec = (DecimalColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ HiveDecimal value = vec.vector[0].getHiveDecimal();
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ bloomFilter.addString(value.toString());
+ }
+ for(int i=0; i < length; ++i) {
+ SerializationUtils.writeBigInteger(valueStream,
+ value.unscaledValue());
+ scaleStream.write(value.scale());
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ HiveDecimal value = vec.vector[i + offset].getHiveDecimal();
+ SerializationUtils.writeBigInteger(valueStream,
+ value.unscaledValue());
+ scaleStream.write(value.scale());
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ bloomFilter.addString(value.toString());
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ valueStream.flush();
+ scaleStream.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ valueStream.getPosition(recorder);
+ scaleStream.getPosition(recorder);
+ }
+ }
+
+ private static class StructTreeWriter extends TreeWriter {
+ StructTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ List<TypeDescription> children = schema.getChildren();
+ childrenWriters = new TreeWriter[children.size()];
+ for(int i=0; i < childrenWriters.length; ++i) {
+ childrenWriters[i] = createTreeWriter(
+ children.get(i), writer,
+ true);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException {
+ // update the statistics for the root column
+ indexStatistics.increment(length);
+ // I'm assuming that the root column isn't nullable so that I don't need
+ // to update isPresent.
+ for(int i=0; i < childrenWriters.length; ++i) {
+ childrenWriters[i].writeBatch(batch.cols[i], offset, length);
+ }
+ }
+
+ private static void writeFields(StructColumnVector vector,
+ TreeWriter[] childrenWriters,
+ int offset, int length) throws IOException {
+ for(int field=0; field < childrenWriters.length; ++field) {
+ childrenWriters[field].writeBatch(vector.fields[field], offset, length);
+ }
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ StructColumnVector vec = (StructColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ writeFields(vec, childrenWriters, offset, length);
+ }
+ } else if (vector.noNulls) {
+ writeFields(vec, childrenWriters, offset, length);
+ } else {
+ // write the records in runs
+ int currentRun = 0;
+ boolean started = false;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ if (!started) {
+ started = true;
+ currentRun = i;
+ }
+ } else if (started) {
+ started = false;
+ writeFields(vec, childrenWriters, offset + currentRun,
+ i - currentRun);
+ }
+ }
+ if (started) {
+ writeFields(vec, childrenWriters, offset + currentRun,
+ length - currentRun);
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+ }
+
+ private static class ListTreeWriter extends TreeWriter {
+ private final IntegerWriter lengths;
+ private final boolean isDirectV2;
+
+ ListTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ childrenWriters = new TreeWriter[1];
+ childrenWriters[0] =
+ createTreeWriter(schema.getChildren().get(0), writer, true);
+ lengths = createIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ ListColumnVector vec = (ListColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int childOffset = (int) vec.offsets[0];
+ int childLength = (int) vec.lengths[0];
+ for(int i=0; i < length; ++i) {
+ lengths.write(childLength);
+ childrenWriters[0].writeBatch(vec.child, childOffset, childLength);
+ }
+ if (createBloomFilter) {
+ bloomFilter.addLong(childLength);
+ }
+ }
+ } else {
+ // write the elements in runs
+ int currentOffset = 0;
+ int currentLength = 0;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ int nextLength = (int) vec.lengths[offset + i];
+ int nextOffset = (int) vec.offsets[offset + i];
+ lengths.write(nextLength);
+ if (currentLength == 0) {
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else if (currentOffset + currentLength != nextOffset) {
+ childrenWriters[0].writeBatch(vec.child, currentOffset,
+ currentLength);
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else {
+ currentLength += nextLength;
+ }
+ }
+ }
+ if (currentLength != 0) {
+ childrenWriters[0].writeBatch(vec.child, currentOffset,
+ currentLength);
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ lengths.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ lengths.getPosition(recorder);
+ }
+ }
+
+ private static class MapTreeWriter extends TreeWriter {
+ private final IntegerWriter lengths;
+ private final boolean isDirectV2;
+
+ MapTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ this.isDirectV2 = isNewWriteFormat(writer);
+ childrenWriters = new TreeWriter[2];
+ List<TypeDescription> children = schema.getChildren();
+ childrenWriters[0] =
+ createTreeWriter(children.get(0), writer, true);
+ childrenWriters[1] =
+ createTreeWriter(children.get(1), writer, true);
+ lengths = createIntegerWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ OrcProto.ColumnEncoding getEncoding() {
+ if (isDirectV2) {
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
+ }
+ return OrcProto.ColumnEncoding.newBuilder()
+ .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ MapColumnVector vec = (MapColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int childOffset = (int) vec.offsets[0];
+ int childLength = (int) vec.lengths[0];
+ for(int i=0; i < length; ++i) {
+ lengths.write(childLength);
+ childrenWriters[0].writeBatch(vec.keys, childOffset, childLength);
+ childrenWriters[1].writeBatch(vec.values, childOffset, childLength);
+ }
+ if (createBloomFilter) {
+ bloomFilter.addLong(childLength);
+ }
+ }
+ } else {
+ // write the elements in runs
+ int currentOffset = 0;
+ int currentLength = 0;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ int nextLength = (int) vec.lengths[offset + i];
+ int nextOffset = (int) vec.offsets[offset + i];
+ lengths.write(nextLength);
+ if (currentLength == 0) {
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else if (currentOffset + currentLength != nextOffset) {
+ childrenWriters[0].writeBatch(vec.keys, currentOffset,
+ currentLength);
+ childrenWriters[1].writeBatch(vec.values, currentOffset,
+ currentLength);
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else {
+ currentLength += nextLength;
+ }
+ }
+ }
+ if (currentLength != 0) {
+ childrenWriters[0].writeBatch(vec.keys, currentOffset,
+ currentLength);
+ childrenWriters[1].writeBatch(vec.values, currentOffset,
+ currentLength);
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ lengths.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ lengths.getPosition(recorder);
+ }
+ }
+
+ private static class UnionTreeWriter extends TreeWriter {
+ private final RunLengthByteWriter tags;
+
+ UnionTreeWriter(int columnId,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, schema, writer, nullable);
+ List<TypeDescription> children = schema.getChildren();
+ childrenWriters = new TreeWriter[children.size()];
+ for(int i=0; i < childrenWriters.length; ++i) {
+ childrenWriters[i] =
+ createTreeWriter(children.get(i), writer, true);
+ }
+ tags =
+ new RunLengthByteWriter(writer.createStream(columnId,
+ OrcProto.Stream.Kind.DATA));
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ UnionColumnVector vec = (UnionColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte tag = (byte) vec.tags[0];
+ for(int i=0; i < length; ++i) {
+ tags.write(tag);
+ }
+ if (createBloomFilter) {
+ bloomFilter.addLong(tag);
+ }
+ childrenWriters[tag].writeBatch(vec.fields[tag], offset, length);
+ }
+ } else {
+ // write the records in runs of the same tag
+ byte prevTag = 0;
+ int currentRun = 0;
+ boolean started = false;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ byte tag = (byte) vec.tags[offset + i];
+ tags.write(tag);
+ if (!started) {
+ started = true;
+ currentRun = i;
+ prevTag = tag;
+ } else if (tag != prevTag) {
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, i - currentRun);
+ currentRun = i;
+ prevTag = tag;
+ }
+ } else if (started) {
+ started = false;
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, i - currentRun);
+ }
+ }
+ if (started) {
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, length - currentRun);
+ }
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ tags.flush();
+ for(TreeWriter child: childrenWriters) {
+ child.writeStripe(builder, requiredIndexEntries);
+ }
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ tags.getPosition(recorder);
+ }
+ }
+
+ private static TreeWriter createTreeWriter(TypeDescription schema,
+ StreamFactory streamFactory,
+ boolean nullable) throws IOException {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return new BooleanTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case BYTE:
+ return new ByteTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case SHORT:
+ case INT:
+ case LONG:
+ return new IntegerTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case FLOAT:
+ return new FloatTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case DOUBLE:
+ return new DoubleTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case STRING:
+ return new StringTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case CHAR:
+ return new CharTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case VARCHAR:
+ return new VarcharTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case BINARY:
+ return new BinaryTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case TIMESTAMP:
+ return new TimestampTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case DATE:
+ return new DateTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case DECIMAL:
+ return new DecimalTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case STRUCT:
+ return new StructTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case MAP:
+ return new MapTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case LIST:
+ return new ListTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ case UNION:
+ return new UnionTreeWriter(streamFactory.getNextColumnId(),
+ schema, streamFactory, nullable);
+ default:
+ throw new IllegalArgumentException("Bad category: " +
+ schema.getCategory());
+ }
+ }
+
+ private static void writeTypes(OrcProto.Footer.Builder builder,
+ TypeDescription schema) {
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ List<TypeDescription> children = schema.getChildren();
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ type.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ type.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ type.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ type.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ type.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ type.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ type.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ type.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case CHAR:
+ type.setKind(OrcProto.Type.Kind.CHAR);
+ type.setMaximumLength(schema.getMaxLength());
+ break;
+ case VARCHAR:
+ type.setKind(OrcProto.Type.Kind.VARCHAR);
+ type.setMaximumLength(schema.getMaxLength());
+ break;
+ case BINARY:
+ type.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DATE:
+ type.setKind(OrcProto.Type.Kind.DATE);
+ break;
+ case DECIMAL:
+ type.setKind(OrcProto.Type.Kind.DECIMAL);
+ type.setPrecision(schema.getPrecision());
+ type.setScale(schema.getScale());
+ break;
+ case LIST:
+ type.setKind(OrcProto.Type.Kind.LIST);
+ type.addSubtypes(children.get(0).getId());
+ break;
+ case MAP:
+ type.setKind(OrcProto.Type.Kind.MAP);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ break;
+ case STRUCT:
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+ for(TypeD
<TRUNCATED>
[3/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.
Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/filters/BloomFilterIO.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/filters/BloomFilterIO.java b/ql/src/java/org/apache/hadoop/hive/ql/io/filters/BloomFilterIO.java
deleted file mode 100644
index 878efbe..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/filters/BloomFilterIO.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io.filters;
-
-import org.apache.orc.OrcProto;
-import org.apache.hive.common.util.BloomFilter;
-
-import com.google.common.primitives.Longs;
-
-public class BloomFilterIO extends BloomFilter {
-
- public BloomFilterIO(long expectedEntries) {
- super(expectedEntries, DEFAULT_FPP);
- }
-
- public BloomFilterIO(long expectedEntries, double fpp) {
- super(expectedEntries, fpp);
- }
-
-/**
- * Initializes the BloomFilter from the given Orc BloomFilter
- */
- public BloomFilterIO(OrcProto.BloomFilter bloomFilter) {
- this.bitSet = new BitSet(Longs.toArray(bloomFilter.getBitsetList()));
- this.numHashFunctions = bloomFilter.getNumHashFunctions();
- this.numBits = (int) this.bitSet.bitSize();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
index a242a37..5bcb281 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
+import org.apache.orc.BloomFilterIO;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
index b746390..e3e234e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java
@@ -24,8 +24,8 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
import org.codehaus.jettison.json.JSONArray;
+import org.apache.orc.BloomFilterIO;
import org.apache.orc.BinaryColumnStatistics;
import org.apache.orc.BooleanColumnStatistics;
import org.apache.orc.ColumnStatistics;
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 975825a..58e5da1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -24,6 +24,7 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.orc.FileMetadata;
@@ -88,6 +89,11 @@ public final class OrcFile extends org.apache.orc.OrcFile {
public static class WriterOptions extends org.apache.orc.OrcFile.WriterOptions {
private boolean explicitSchema = false;
private ObjectInspector inspector = null;
+ // Setting the default batch size to 1000 makes the memory check at 5000
+ // rows work the same as the row by row writer. (If it was the default 1024,
+ // the smallest stripe size would be 5120 rows, which changes the output
+ // of some of the tests.)
+ private int batchSize = 1000;
WriterOptions(Properties tableProperties, Configuration conf) {
super(tableProperties, conf);
@@ -249,6 +255,19 @@ public final class OrcFile extends org.apache.orc.OrcFile {
super.memory(value);
return this;
}
+
+ protected WriterOptions batchSize(int maxSize) {
+ batchSize = maxSize;
+ return this;
+ }
+
+ ObjectInspector getInspector() {
+ return inspector;
+ }
+
+ int getBatchSize() {
+ return batchSize;
+ }
}
/**
@@ -286,16 +305,7 @@ public final class OrcFile extends org.apache.orc.OrcFile {
FileSystem fs = opts.getFileSystem() == null ?
path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem();
- return new WriterImpl(fs, path, opts.getConfiguration(), opts.inspector,
- opts.getSchema(),
- opts.getStripeSize(), opts.getCompress(),
- opts.getBufferSize(), opts.getRowIndexStride(),
- opts.getMemoryManager(), opts.getBlockPadding(),
- opts.getVersion(), opts.getCallback(),
- opts.getEncodingStrategy(),
- opts.getCompressionStrategy(),
- opts.getPaddingTolerance(), opts.getBlockSize(),
- opts.getBloomFilterColumns(), opts.getBloomFilterFpp());
+ return new WriterImpl(fs, path, opts);
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index e31fd0b..30c2fad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -63,6 +63,7 @@ public class ReaderImpl implements Reader {
private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
protected final FileSystem fileSystem;
+ private final long maxLength;
protected final Path path;
protected final org.apache.orc.CompressionKind compressionKind;
protected final CompressionCodec codec;
@@ -329,6 +330,7 @@ public class ReaderImpl implements Reader {
this.fileSystem = fs;
this.path = path;
this.conf = options.getConfiguration();
+ this.maxLength = options.getMaxLength();
FileMetadata fileMetadata = options.getFileMetadata();
if (fileMetadata != null) {
@@ -859,4 +861,17 @@ public class ReaderImpl implements Reader {
public DataReader createDefaultDataReader(boolean useZeroCopy) {
return RecordReaderUtils.createDefaultDataReader(fileSystem, path, useZeroCopy, codec);
}
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("ORC Reader(");
+ buffer.append(path);
+ if (maxLength != -1) {
+ buffer.append(", ");
+ buffer.append(maxLength);
+ }
+ buffer.append(")");
+ return buffer.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 607003f..a85bfef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
+import org.apache.orc.BloomFilterIO;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
index 1f5927a..92f5ab8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
@@ -26,7 +26,7 @@ import java.io.IOException;
public interface Writer extends org.apache.orc.Writer {
/**
- * Add a row to the ORC file.
+ * Add a row to the end of the ORC file.
* @param row the row to add
* @throws IOException
*/
[2/5] hive git commit: HIVE-12055. Move WriterImpl over to orc module.
Posted by om...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/06e39ebe/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 993be71..8cfb402 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -18,52 +18,15 @@
package org.apache.hadoop.hive.ql.io.orc;
-import static com.google.common.base.Preconditions.checkArgument;
-
import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import java.util.TimeZone;
-import java.util.TreeMap;
+import java.util.Set;
-import org.apache.orc.BinaryColumnStatistics;
-import org.apache.orc.impl.BitFieldWriter;
-import org.apache.orc.impl.ColumnStatisticsImpl;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.impl.DynamicIntArray;
-import org.apache.orc.impl.IntegerWriter;
-import org.apache.orc.impl.MemoryManager;
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcUtils;
-import org.apache.orc.impl.OutStream;
-import org.apache.orc.impl.PositionRecorder;
-import org.apache.orc.impl.PositionedOutputStream;
-import org.apache.orc.impl.RunLengthByteWriter;
-import org.apache.orc.impl.RunLengthIntegerWriter;
-import org.apache.orc.impl.RunLengthIntegerWriterV2;
-import org.apache.orc.impl.SerializationUtils;
-import org.apache.orc.impl.SnappyCodec;
-import org.apache.orc.impl.StreamName;
-import org.apache.orc.StringColumnStatistics;
-import org.apache.orc.impl.StringRedBlackTree;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.ZlibCodec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -74,10 +37,6 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
-import org.apache.orc.CompressionCodec.Modifier;
-import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -98,17 +57,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspect
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
-import org.apache.hadoop.hive.shims.HadoopShims;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.orc.OrcProto;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
/**
* An ORC file writer. The file is divided into stripes, which is the natural
@@ -128,3184 +78,240 @@ import com.google.protobuf.CodedOutputStream;
* thread as well.
*
*/
-public class WriterImpl implements Writer, MemoryManager.Callback {
-
- private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
- static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
-
- private static final int HDFS_BUFFER_SIZE = 256 * 1024;
- private static final int MIN_ROW_INDEX_STRIDE = 1000;
+public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer {
- // threshold above which buffer size will be automatically resized
- private static final int COLUMN_COUNT_THRESHOLD = 1000;
-
- private final FileSystem fs;
- private final Path path;
- private final long defaultStripeSize;
- private long adjustedStripeSize;
- private final int rowIndexStride;
- private final CompressionKind compress;
- private final CompressionCodec codec;
- private final boolean addBlockPadding;
- private final int bufferSize;
- private final long blockSize;
- private final double paddingTolerance;
- private final TypeDescription schema;
-
- // the streams that make up the current stripe
- private final Map<StreamName, BufferedStream> streams =
- new TreeMap<StreamName, BufferedStream>();
-
- private FSDataOutputStream rawWriter = null;
- // the compressed metadata information outStream
- private OutStream writer = null;
- // a protobuf outStream around streamFactory
- private CodedOutputStream protobufWriter = null;
- private long headerLength;
- private int columnCount;
- private long rowCount = 0;
- private long rowsInStripe = 0;
- private long rawDataSize = 0;
- private int rowsInIndex = 0;
- private int stripesAtLastFlush = -1;
- private final List<OrcProto.StripeInformation> stripes =
- new ArrayList<OrcProto.StripeInformation>();
- private final Map<String, ByteString> userMetadata =
- new TreeMap<String, ByteString>();
- private final StreamFactory streamFactory = new StreamFactory();
- private final TreeWriter treeWriter;
- private final boolean buildIndex;
- private final MemoryManager memoryManager;
- private final OrcFile.Version version;
- private final Configuration conf;
- private final OrcFile.WriterCallback callback;
- private final OrcFile.WriterContext callbackContext;
- private final OrcFile.EncodingStrategy encodingStrategy;
- private final OrcFile.CompressionStrategy compressionStrategy;
- private final boolean[] bloomFilterColumns;
- private final double bloomFilterFpp;
- private boolean writeTimeZone;
+ private final ObjectInspector inspector;
+ private final VectorizedRowBatch internalBatch;
+ private final StructField[] fields;
WriterImpl(FileSystem fs,
- Path path,
- Configuration conf,
- ObjectInspector inspector,
- TypeDescription schema,
- long stripeSize,
- CompressionKind compress,
- int bufferSize,
- int rowIndexStride,
- MemoryManager memoryManager,
- boolean addBlockPadding,
- OrcFile.Version version,
- OrcFile.WriterCallback callback,
- OrcFile.EncodingStrategy encodingStrategy,
- OrcFile.CompressionStrategy compressionStrategy,
- double paddingTolerance,
- long blockSizeValue,
- String bloomFilterColumnNames,
- double bloomFilterFpp) throws IOException {
- this.fs = fs;
- this.path = path;
- this.conf = conf;
- this.callback = callback;
- this.schema = schema;
- if (callback != null) {
- callbackContext = new OrcFile.WriterContext(){
-
- @Override
- public Writer getWriter() {
- return WriterImpl.this;
- }
- };
- } else {
- callbackContext = null;
- }
- this.adjustedStripeSize = stripeSize;
- this.defaultStripeSize = stripeSize;
- this.version = version;
- this.encodingStrategy = encodingStrategy;
- this.compressionStrategy = compressionStrategy;
- this.addBlockPadding = addBlockPadding;
- this.blockSize = blockSizeValue;
- this.paddingTolerance = paddingTolerance;
- this.compress = compress;
- this.rowIndexStride = rowIndexStride;
- this.memoryManager = memoryManager;
- buildIndex = rowIndexStride > 0;
- codec = createCodec(compress);
- int numColumns = schema.getMaximumId() + 1;
- this.bufferSize = getEstimatedBufferSize(defaultStripeSize,
- numColumns, bufferSize);
- if (version == OrcFile.Version.V_0_11) {
- /* do not write bloom filters for ORC v11 */
- this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1];
- } else {
- this.bloomFilterColumns =
- OrcUtils.includeColumns(bloomFilterColumnNames, schema);
- }
- this.bloomFilterFpp = bloomFilterFpp;
- treeWriter = createTreeWriter(inspector, schema, streamFactory, false);
- if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) {
- throw new IllegalArgumentException("Row stride must be at least " +
- MIN_ROW_INDEX_STRIDE);
- }
-
- // ensure that we are able to handle callbacks before we register ourselves
- memoryManager.addWriter(path, stripeSize, this);
- }
-
- @VisibleForTesting
- static int getEstimatedBufferSize(long stripeSize, int numColumns, int bs) {
- // The worst case is that there are 2 big streams per a column and
- // we want to guarantee that each stream gets ~10 buffers.
- // This keeps buffers small enough that we don't get really small stripe
- // sizes.
- int estBufferSize = (int) (stripeSize / (20 * numColumns));
- estBufferSize = getClosestBufferSize(estBufferSize);
- if (estBufferSize > bs) {
- estBufferSize = bs;
- } else {
- LOG.info("WIDE TABLE - Number of columns: " + numColumns +
- " Chosen compression buffer size: " + estBufferSize);
- }
- return estBufferSize;
- }
-
- private static int getClosestBufferSize(int estBufferSize) {
- final int kb4 = 4 * 1024;
- final int kb8 = 8 * 1024;
- final int kb16 = 16 * 1024;
- final int kb32 = 32 * 1024;
- final int kb64 = 64 * 1024;
- final int kb128 = 128 * 1024;
- final int kb256 = 256 * 1024;
- if (estBufferSize <= kb4) {
- return kb4;
- } else if (estBufferSize > kb4 && estBufferSize <= kb8) {
- return kb8;
- } else if (estBufferSize > kb8 && estBufferSize <= kb16) {
- return kb16;
- } else if (estBufferSize > kb16 && estBufferSize <= kb32) {
- return kb32;
- } else if (estBufferSize > kb32 && estBufferSize <= kb64) {
- return kb64;
- } else if (estBufferSize > kb64 && estBufferSize <= kb128) {
- return kb128;
+ Path path,
+ OrcFile.WriterOptions opts) throws IOException {
+ super(fs, path, opts);
+ this.inspector = opts.getInspector();
+ internalBatch = opts.getSchema().createRowBatch(opts.getBatchSize());
+ if (inspector instanceof StructObjectInspector) {
+ List<? extends StructField> fieldList =
+ ((StructObjectInspector) inspector).getAllStructFieldRefs();
+ fields = new StructField[fieldList.size()];
+ fieldList.toArray(fields);
} else {
- return kb256;
- }
- }
-
- public static CompressionCodec createCodec(CompressionKind kind) {
- switch (kind) {
- case NONE:
- return null;
- case ZLIB:
- return new ZlibCodec();
- case SNAPPY:
- return new SnappyCodec();
- case LZO:
- try {
- Class<? extends CompressionCodec> lzo =
- (Class<? extends CompressionCodec>)
- JavaUtils.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
- return lzo.newInstance();
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("LZO is not available.", e);
- } catch (InstantiationException e) {
- throw new IllegalArgumentException("Problem initializing LZO", e);
- } catch (IllegalAccessException e) {
- throw new IllegalArgumentException("Insufficient access to LZO", e);
- }
- default:
- throw new IllegalArgumentException("Unknown compression codec: " +
- kind);
- }
- }
-
- @Override
- public boolean checkMemory(double newScale) throws IOException {
- long limit = (long) Math.round(adjustedStripeSize * newScale);
- long size = estimateStripeSize();
- if (LOG.isDebugEnabled()) {
- LOG.debug("ORC writer " + path + " size = " + size + " limit = " +
- limit);
- }
- if (size > limit) {
- flushStripe();
- return true;
- }
- return false;
- }
-
- /**
- * This class is used to hold the contents of streams as they are buffered.
- * The TreeWriters write to the outStream and the codec compresses the
- * data as buffers fill up and stores them in the output list. When the
- * stripe is being written, the whole stream is written to the file.
- */
- private class BufferedStream implements OutStream.OutputReceiver {
- private final OutStream outStream;
- private final List<ByteBuffer> output = new ArrayList<ByteBuffer>();
-
- BufferedStream(String name, int bufferSize,
- CompressionCodec codec) throws IOException {
- outStream = new OutStream(name, bufferSize, codec, this);
- }
-
- /**
- * Receive a buffer from the compression codec.
- * @param buffer the buffer to save
- * @throws IOException
- */
- @Override
- public void output(ByteBuffer buffer) {
- output.add(buffer);
- }
-
- /**
- * Get the number of bytes in buffers that are allocated to this stream.
- * @return number of bytes in buffers
- */
- public long getBufferSize() {
- long result = 0;
- for(ByteBuffer buf: output) {
- result += buf.capacity();
- }
- return outStream.getBufferSize() + result;
- }
-
- /**
- * Flush the stream to the codec.
- * @throws IOException
- */
- public void flush() throws IOException {
- outStream.flush();
- }
-
- /**
- * Clear all of the buffers.
- * @throws IOException
- */
- public void clear() throws IOException {
- outStream.clear();
- output.clear();
- }
-
- /**
- * Check the state of suppress flag in output stream
- * @return value of suppress flag
- */
- public boolean isSuppressed() {
- return outStream.isSuppressed();
- }
-
- /**
- * Get the number of bytes that will be written to the output. Assumes
- * the stream has already been flushed.
- * @return the number of bytes
- */
- public long getOutputSize() {
- long result = 0;
- for(ByteBuffer buffer: output) {
- result += buffer.remaining();
- }
- return result;
- }
-
- /**
- * Write the saved compressed buffers to the OutputStream.
- * @param out the stream to write to
- * @throws IOException
- */
- void spillTo(OutputStream out) throws IOException {
- for(ByteBuffer buffer: output) {
- out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
- }
- }
-
- @Override
- public String toString() {
- return outStream.toString();
- }
- }
-
- /**
- * An output receiver that writes the ByteBuffers to the output stream
- * as they are received.
- */
- private class DirectStream implements OutStream.OutputReceiver {
- private final FSDataOutputStream output;
-
- DirectStream(FSDataOutputStream output) {
- this.output = output;
- }
-
- @Override
- public void output(ByteBuffer buffer) throws IOException {
- output.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
- }
- }
-
- private static class RowIndexPositionRecorder implements PositionRecorder {
- private final OrcProto.RowIndexEntry.Builder builder;
-
- RowIndexPositionRecorder(OrcProto.RowIndexEntry.Builder builder) {
- this.builder = builder;
- }
-
- @Override
- public void addPosition(long position) {
- builder.addPositions(position);
+ fields = null;
}
}
- /**
- * Interface from the Writer to the TreeWriters. This limits the visibility
- * that the TreeWriters have into the Writer.
- */
- private class StreamFactory {
- /**
- * Create a stream to store part of a column.
- * @param column the column id for the stream
- * @param kind the kind of stream
- * @return The output outStream that the section needs to be written to.
- * @throws IOException
- */
- public OutStream createStream(int column,
- OrcProto.Stream.Kind kind
- ) throws IOException {
- final StreamName name = new StreamName(column, kind);
- final EnumSet<CompressionCodec.Modifier> modifiers;
-
- switch (kind) {
- case BLOOM_FILTER:
- case DATA:
- case DICTIONARY_DATA:
- if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) {
- modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
- } else {
- modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
- }
- break;
- case LENGTH:
- case DICTIONARY_COUNT:
- case PRESENT:
- case ROW_INDEX:
- case SECONDARY:
- // easily compressed using the fastest modes
- modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
- break;
- default:
- LOG.warn("Missing ORC compression modifiers for " + kind);
- modifiers = null;
- break;
- }
-
- BufferedStream result = streams.get(name);
- if (result == null) {
- result = new BufferedStream(name.toString(), bufferSize,
- codec == null ? codec : codec.modify(modifiers));
- streams.put(name, result);
- }
- return result.outStream;
- }
-
- /**
- * Get the next column id.
- * @return a number from 0 to the number of columns - 1
- */
- public int getNextColumnId() {
- return columnCount++;
- }
-
- /**
- * Get the stride rate of the row index.
- */
- public int getRowIndexStride() {
- return rowIndexStride;
- }
-
- /**
- * Should be building the row index.
- * @return true if we are building the index
- */
- public boolean buildIndex() {
- return buildIndex;
- }
-
- /**
- * Is the ORC file compressed?
- * @return are the streams compressed
- */
- public boolean isCompressed() {
- return codec != null;
- }
-
- /**
- * Get the encoding strategy to use.
- * @return encoding strategy
- */
- public OrcFile.EncodingStrategy getEncodingStrategy() {
- return encodingStrategy;
- }
-
- /**
- * Get the compression strategy to use.
- * @return compression strategy
- */
- public OrcFile.CompressionStrategy getCompressionStrategy() {
- return compressionStrategy;
- }
-
- /**
- * Get the bloom filter columns
- * @return bloom filter columns
- */
- public boolean[] getBloomFilterColumns() {
- return bloomFilterColumns;
- }
-
- /**
- * Get bloom filter false positive percentage.
- * @return fpp
- */
- public double getBloomFilterFPP() {
- return bloomFilterFpp;
- }
-
- /**
- * Get the writer's configuration.
- * @return configuration
- */
- public Configuration getConfiguration() {
- return conf;
- }
-
- /**
- * Get the version of the file to write.
- */
- public OrcFile.Version getVersion() {
- return version;
- }
-
- public void useWriterTimeZone(boolean val) {
- writeTimeZone = val;
- }
-
- public boolean hasWriterTimeZone() {
- return writeTimeZone;
- }
- }
+ private static final long NANOS_PER_MILLI = 1000000;
/**
- * The parent class of all of the writers for each column. Each column
- * is written by an instance of this class. The compound types (struct,
- * list, map, and union) have children tree writers that write the children
- * types.
+ * Set the value for a given column value within a batch.
+ * @param rowId the row to set
+ * @param column the column to set
+ * @param inspector the object inspector to interpret the obj
+ * @param obj the value to use
*/
- private abstract static class TreeWriter {
- protected final int id;
- protected final ObjectInspector inspector;
- protected final BitFieldWriter isPresent;
- private final boolean isCompressed;
- protected final ColumnStatisticsImpl indexStatistics;
- protected final ColumnStatisticsImpl stripeColStatistics;
- private final ColumnStatisticsImpl fileStatistics;
- protected TreeWriter[] childrenWriters;
- protected final RowIndexPositionRecorder rowIndexPosition;
- private final OrcProto.RowIndex.Builder rowIndex;
- private final OrcProto.RowIndexEntry.Builder rowIndexEntry;
- private final PositionedOutputStream rowIndexStream;
- private final PositionedOutputStream bloomFilterStream;
- protected final BloomFilterIO bloomFilter;
- protected final boolean createBloomFilter;
- private final OrcProto.BloomFilterIndex.Builder bloomFilterIndex;
- private final OrcProto.BloomFilter.Builder bloomFilterEntry;
- private boolean foundNulls;
- private OutStream isPresentOutStream;
- private final List<OrcProto.StripeStatistics.Builder> stripeStatsBuilders;
- private final StreamFactory streamFactory;
-
- /**
- * Create a tree writer.
- * @param columnId the column id of the column to write
- * @param inspector the object inspector to use
- * @param schema the row schema
- * @param streamFactory limited access to the Writer's data.
- * @param nullable can the value be null?
- * @throws IOException
- */
- TreeWriter(int columnId, ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory streamFactory,
- boolean nullable) throws IOException {
- this.streamFactory = streamFactory;
- this.isCompressed = streamFactory.isCompressed();
- this.id = columnId;
- this.inspector = inspector;
- if (nullable) {
- isPresentOutStream = streamFactory.createStream(id,
- OrcProto.Stream.Kind.PRESENT);
- isPresent = new BitFieldWriter(isPresentOutStream, 1);
- } else {
- isPresent = null;
- }
- this.foundNulls = false;
- createBloomFilter = streamFactory.getBloomFilterColumns()[columnId];
- indexStatistics = ColumnStatisticsImpl.create(schema);
- stripeColStatistics = ColumnStatisticsImpl.create(schema);
- fileStatistics = ColumnStatisticsImpl.create(schema);
- childrenWriters = new TreeWriter[0];
- rowIndex = OrcProto.RowIndex.newBuilder();
- rowIndexEntry = OrcProto.RowIndexEntry.newBuilder();
- rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry);
- stripeStatsBuilders = Lists.newArrayList();
- if (streamFactory.buildIndex()) {
- rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX);
- } else {
- rowIndexStream = null;
- }
- if (createBloomFilter) {
- bloomFilterEntry = OrcProto.BloomFilter.newBuilder();
- bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder();
- bloomFilterStream = streamFactory.createStream(id, OrcProto.Stream.Kind.BLOOM_FILTER);
- bloomFilter = new BloomFilterIO(streamFactory.getRowIndexStride(),
- streamFactory.getBloomFilterFPP());
- } else {
- bloomFilterEntry = null;
- bloomFilterIndex = null;
- bloomFilterStream = null;
- bloomFilter = null;
- }
- }
-
- protected OrcProto.RowIndex.Builder getRowIndex() {
- return rowIndex;
- }
-
- protected ColumnStatisticsImpl getStripeStatistics() {
- return stripeColStatistics;
- }
-
- protected ColumnStatisticsImpl getFileStatistics() {
- return fileStatistics;
- }
-
- protected OrcProto.RowIndexEntry.Builder getRowIndexEntry() {
- return rowIndexEntry;
- }
-
- IntegerWriter createIntegerWriter(PositionedOutputStream output,
- boolean signed, boolean isDirectV2,
- StreamFactory writer) {
- if (isDirectV2) {
- boolean alignedBitpacking = false;
- if (writer.getEncodingStrategy().equals(OrcFile.EncodingStrategy.SPEED)) {
- alignedBitpacking = true;
- }
- return new RunLengthIntegerWriterV2(output, signed, alignedBitpacking);
- } else {
- return new RunLengthIntegerWriter(output, signed);
- }
- }
-
- boolean isNewWriteFormat(StreamFactory writer) {
- return writer.getVersion() != OrcFile.Version.V_0_11;
- }
-
- /**
- * Add a new value to the column.
- * @param obj the object to write
- * @throws IOException
- */
- void write(Object obj) throws IOException {
- if (obj != null) {
- indexStatistics.increment();
- } else {
- indexStatistics.setNull();
- }
- if (isPresent != null) {
- isPresent.write(obj == null ? 0 : 1);
- if(obj == null) {
- foundNulls = true;
- }
- }
- }
-
- /**
- * Handle the top level object write.
- *
- * This default method is used for all types except structs, which are the
- * typical case. VectorizedRowBatch assumes the top level object is a
- * struct, so we use the first column for all other types.
- * @param batch the batch to write from
- * @param offset the row to start on
- * @param length the number of rows to write
- * @throws IOException
- */
- void writeRootBatch(VectorizedRowBatch batch, int offset,
- int length) throws IOException {
- writeBatch(batch.cols[0], offset, length);
- }
-
- /**
- * Write the values from the given vector from offset for length elements.
- * @param vector the vector to write from
- * @param offset the first value from the vector to write
- * @param length the number of values from the vector to write
- * @throws IOException
- */
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- if (vector.noNulls) {
- indexStatistics.increment(length);
- if (isPresent != null) {
- for (int i = 0; i < length; ++i) {
- isPresent.write(1);
- }
- }
- } else {
- if (vector.isRepeating) {
- boolean isNull = vector.isNull[0];
- if (isPresent != null) {
- for (int i = 0; i < length; ++i) {
- isPresent.write(isNull ? 0 : 1);
+ static void setColumn(int rowId, ColumnVector column,
+ ObjectInspector inspector, Object obj) {
+ if (obj == null) {
+ column.noNulls = false;
+ column.isNull[rowId] = true;
+ } else {
+ switch (inspector.getCategory()) {
+ case PRIMITIVE:
+ switch (((PrimitiveObjectInspector) inspector)
+ .getPrimitiveCategory()) {
+ case BOOLEAN: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] =
+ ((BooleanObjectInspector) inspector).get(obj) ? 1 : 0;
+ break;
}
- }
- if (isNull) {
- foundNulls = true;
- indexStatistics.setNull();
- } else {
- indexStatistics.increment(length);
- }
- } else {
- // count the number of non-null values
- int nonNullCount = 0;
- for(int i = 0; i < length; ++i) {
- boolean isNull = vector.isNull[i + offset];
- if (!isNull) {
- nonNullCount += 1;
+ case BYTE: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = ((ByteObjectInspector) inspector).get(obj);
+ break;
}
- if (isPresent != null) {
- isPresent.write(isNull ? 0 : 1);
+ case SHORT: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] =
+ ((ShortObjectInspector) inspector).get(obj);
+ break;
+ }
+ case INT: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = ((IntObjectInspector) inspector).get(obj);
+ break;
+ }
+ case LONG: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = ((LongObjectInspector) inspector).get(obj);
+ break;
+ }
+ case FLOAT: {
+ DoubleColumnVector vector = (DoubleColumnVector) column;
+ vector.vector[rowId] =
+ ((FloatObjectInspector) inspector).get(obj);
+ break;
+ }
+ case DOUBLE: {
+ DoubleColumnVector vector = (DoubleColumnVector) column;
+ vector.vector[rowId] =
+ ((DoubleObjectInspector) inspector).get(obj);
+ break;
+ }
+ case BINARY: {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ BytesWritable blob = ((BinaryObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj);
+ vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+ break;
+ }
+ case STRING: {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ Text blob = ((StringObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj);
+ vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+ break;
+ }
+ case VARCHAR: {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ Text blob = ((HiveVarcharObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj).getTextValue();
+ vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+ break;
+ }
+ case CHAR: {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ Text blob = ((HiveCharObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj).getTextValue();
+ vector.setVal(rowId, blob.getBytes(), 0, blob.getLength());
+ break;
+ }
+ case TIMESTAMP: {
+ LongColumnVector vector = (LongColumnVector) column;
+ Timestamp ts = ((TimestampObjectInspector) inspector)
+ .getPrimitiveJavaObject(obj);
+ vector.vector[rowId] = ts.getTime() * NANOS_PER_MILLI +
+ (ts.getNanos() % NANOS_PER_MILLI);
+ break;
+ }
+ case DATE: {
+ LongColumnVector vector = (LongColumnVector) column;
+ vector.vector[rowId] = ((DateObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj).getDays();
+ break;
+ }
+ case DECIMAL: {
+ DecimalColumnVector vector = (DecimalColumnVector) column;
+ vector.set(rowId, ((HiveDecimalObjectInspector) inspector)
+ .getPrimitiveWritableObject(obj));
+ break;
}
}
- indexStatistics.increment(nonNullCount);
- if (nonNullCount != length) {
- foundNulls = true;
- indexStatistics.setNull();
- }
- }
- }
- }
-
- private void removeIsPresentPositions() {
- for(int i=0; i < rowIndex.getEntryCount(); ++i) {
- OrcProto.RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
- List<Long> positions = entry.getPositionsList();
- // bit streams use 3 positions if uncompressed, 4 if compressed
- positions = positions.subList(isCompressed ? 4 : 3, positions.size());
- entry.clearPositions();
- entry.addAllPositions(positions);
- }
- }
-
- /**
- * Write the stripe out to the file.
- * @param builder the stripe footer that contains the information about the
- * layout of the stripe. The TreeWriter is required to update
- * the footer with its information.
- * @param requiredIndexEntries the number of index entries that are
- * required. this is to check to make sure the
- * row index is well formed.
- * @throws IOException
- */
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- if (isPresent != null) {
- isPresent.flush();
-
- // if no nulls are found in a stream, then suppress the stream
- if(!foundNulls) {
- isPresentOutStream.suppress();
- // since isPresent bitstream is suppressed, update the index to
- // remove the positions of the isPresent stream
- if (rowIndexStream != null) {
- removeIsPresentPositions();
+ break;
+ case STRUCT: {
+ StructColumnVector vector = (StructColumnVector) column;
+ StructObjectInspector oi = (StructObjectInspector) inspector;
+ List<? extends StructField> fields = oi.getAllStructFieldRefs();
+ for (int c = 0; c < vector.fields.length; ++c) {
+ StructField field = fields.get(c);
+ setColumn(rowId, vector.fields[c], field.getFieldObjectInspector(),
+ oi.getStructFieldData(obj, field));
}
+ break;
}
- }
-
- // merge stripe-level column statistics to file statistics and write it to
- // stripe statistics
- OrcProto.StripeStatistics.Builder stripeStatsBuilder = OrcProto.StripeStatistics.newBuilder();
- writeStripeStatistics(stripeStatsBuilder, this);
- stripeStatsBuilders.add(stripeStatsBuilder);
-
- // reset the flag for next stripe
- foundNulls = false;
-
- builder.addColumns(getEncoding());
- if (streamFactory.hasWriterTimeZone()) {
- builder.setWriterTimezone(TimeZone.getDefault().getID());
- }
- if (rowIndexStream != null) {
- if (rowIndex.getEntryCount() != requiredIndexEntries) {
- throw new IllegalArgumentException("Column has wrong number of " +
- "index entries found: " + rowIndex.getEntryCount() + " expected: " +
- requiredIndexEntries);
+ case UNION: {
+ UnionColumnVector vector = (UnionColumnVector) column;
+ UnionObjectInspector oi = (UnionObjectInspector) inspector;
+ int tag = oi.getTag(obj);
+ vector.tags[rowId] = tag;
+ setColumn(rowId, vector.fields[tag],
+ oi.getObjectInspectors().get(tag), oi.getField(obj));
+ break;
}
- rowIndex.build().writeTo(rowIndexStream);
- rowIndexStream.flush();
- }
- rowIndex.clear();
- rowIndexEntry.clear();
-
- // write the bloom filter to out stream
- if (bloomFilterStream != null) {
- bloomFilterIndex.build().writeTo(bloomFilterStream);
- bloomFilterStream.flush();
- bloomFilterIndex.clear();
- bloomFilterEntry.clear();
- }
- }
-
- private void writeStripeStatistics(OrcProto.StripeStatistics.Builder builder,
- TreeWriter treeWriter) {
- treeWriter.fileStatistics.merge(treeWriter.stripeColStatistics);
- builder.addColStats(treeWriter.stripeColStatistics.serialize().build());
- treeWriter.stripeColStatistics.reset();
- for (TreeWriter child : treeWriter.getChildrenWriters()) {
- writeStripeStatistics(builder, child);
- }
- }
-
- TreeWriter[] getChildrenWriters() {
- return childrenWriters;
- }
-
- /**
- * Get the encoding for this column.
- * @return the information about the encoding of this column
- */
- OrcProto.ColumnEncoding getEncoding() {
- return OrcProto.ColumnEncoding.newBuilder().setKind(
- OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
-
- /**
- * Create a row index entry with the previous location and the current
- * index statistics. Also merges the index statistics into the file
- * statistics before they are cleared. Finally, it records the start of the
- * next index and ensures all of the children columns also create an entry.
- * @throws IOException
- */
- void createRowIndexEntry() throws IOException {
- stripeColStatistics.merge(indexStatistics);
- rowIndexEntry.setStatistics(indexStatistics.serialize());
- indexStatistics.reset();
- rowIndex.addEntry(rowIndexEntry);
- rowIndexEntry.clear();
- addBloomFilterEntry();
- recordPosition(rowIndexPosition);
- for(TreeWriter child: childrenWriters) {
- child.createRowIndexEntry();
- }
- }
-
- void addBloomFilterEntry() {
- if (createBloomFilter) {
- bloomFilterEntry.setNumHashFunctions(bloomFilter.getNumHashFunctions());
- bloomFilterEntry.addAllBitset(Longs.asList(bloomFilter.getBitSet()));
- bloomFilterIndex.addBloomFilter(bloomFilterEntry.build());
- bloomFilter.reset();
- bloomFilterEntry.clear();
- }
- }
-
- /**
- * Record the current position in each of this column's streams.
- * @param recorder where should the locations be recorded
- * @throws IOException
- */
- void recordPosition(PositionRecorder recorder) throws IOException {
- if (isPresent != null) {
- isPresent.getPosition(recorder);
- }
- }
-
- /**
- * Estimate how much memory the writer is consuming excluding the streams.
- * @return the number of bytes.
- */
- long estimateMemory() {
- long result = 0;
- for (TreeWriter child: childrenWriters) {
- result += child.estimateMemory();
- }
- return result;
- }
- }
-
- private static class BooleanTreeWriter extends TreeWriter {
- private final BitFieldWriter writer;
-
- BooleanTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- PositionedOutputStream out = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.writer = new BitFieldWriter(out, 1);
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- boolean val = ((BooleanObjectInspector) inspector).get(obj);
- indexStatistics.updateBoolean(val, 1);
- writer.write(val ? 1 : 0);
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- int value = vec.vector[0] == 0 ? 0 : 1;
- indexStatistics.updateBoolean(value != 0, length);
- for(int i=0; i < length; ++i) {
- writer.write(value);
+ case LIST: {
+ ListColumnVector vector = (ListColumnVector) column;
+ ListObjectInspector oi = (ListObjectInspector) inspector;
+ int offset = vector.childCount;
+ int length = oi.getListLength(obj);
+ vector.offsets[rowId] = offset;
+ vector.lengths[rowId] = length;
+ vector.child.ensureSize(offset + length, true);
+ vector.childCount += length;
+ for (int c = 0; c < length; ++c) {
+ setColumn(offset + c, vector.child,
+ oi.getListElementObjectInspector(),
+ oi.getListElement(obj, c));
}
+ break;
}
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- int value = vec.vector[i + offset] == 0 ? 0 : 1;
- writer.write(value);
- indexStatistics.updateBoolean(value != 0, 1);
+ case MAP: {
+ MapColumnVector vector = (MapColumnVector) column;
+ MapObjectInspector oi = (MapObjectInspector) inspector;
+ int offset = vector.childCount;
+ Set map = oi.getMap(obj).entrySet();
+ int length = map.size();
+ vector.offsets[rowId] = offset;
+ vector.lengths[rowId] = length;
+ vector.keys.ensureSize(offset + length, true);
+ vector.values.ensureSize(offset + length, true);
+ vector.childCount += length;
+ for (Object item: map) {
+ Map.Entry pair = (Map.Entry) item;
+ setColumn(offset, vector.keys, oi.getMapKeyObjectInspector(),
+ pair.getKey());
+ setColumn(offset, vector.values, oi.getMapValueObjectInspector(),
+ pair.getValue());
+ offset += 1;
}
+ break;
}
+ default:
+ throw new IllegalArgumentException("Unknown ObjectInspector kind " +
+ inspector.getCategory());
}
}
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- writer.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
- }
}
- private static class ByteTreeWriter extends TreeWriter {
- private final RunLengthByteWriter writer;
-
- ByteTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.writer = new RunLengthByteWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA));
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- byte val = ((ByteObjectInspector) inspector).get(obj);
- indexStatistics.updateInteger(val, 1);
- if (createBloomFilter) {
- bloomFilter.addLong(val);
- }
- writer.write(val);
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- byte value = (byte) vec.vector[0];
- indexStatistics.updateInteger(value, length);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- for(int i=0; i < length; ++i) {
- writer.write(value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- byte value = (byte) vec.vector[i + offset];
- writer.write(value);
- indexStatistics.updateInteger(value, 1);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- }
- }
- }
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- writer.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
+ void flushInternalBatch() throws IOException {
+ if (internalBatch.size != 0) {
+ super.addRowBatch(internalBatch);
+ internalBatch.reset();
}
}
- private static class IntegerTreeWriter extends TreeWriter {
- private final IntegerWriter writer;
- private final ShortObjectInspector shortInspector;
- private final IntObjectInspector intInspector;
- private final LongObjectInspector longInspector;
- private boolean isDirectV2 = true;
-
- IntegerTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- OutStream out = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.writer = createIntegerWriter(out, true, isDirectV2, writer);
- if (inspector instanceof IntObjectInspector) {
- intInspector = (IntObjectInspector) inspector;
- shortInspector = null;
- longInspector = null;
- } else {
- intInspector = null;
- if (inspector instanceof LongObjectInspector) {
- longInspector = (LongObjectInspector) inspector;
- shortInspector = null;
- } else {
- shortInspector = (ShortObjectInspector) inspector;
- longInspector = null;
- }
- }
- recordPosition(rowIndexPosition);
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- if (isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- long val;
- if (intInspector != null) {
- val = intInspector.get(obj);
- } else if (longInspector != null) {
- val = longInspector.get(obj);
- } else {
- val = shortInspector.get(obj);
- }
- indexStatistics.updateInteger(val, 1);
- if (createBloomFilter) {
- // integers are converted to longs in column statistics and during SARG evaluation
- bloomFilter.addLong(val);
- }
- writer.write(val);
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- long value = vec.vector[0];
- indexStatistics.updateInteger(value, length);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- for(int i=0; i < length; ++i) {
- writer.write(value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- long value = vec.vector[i + offset];
- writer.write(value);
- indexStatistics.updateInteger(value, 1);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- }
- }
+ @Override
+ public void addRow(Object row) throws IOException {
+ int rowId = internalBatch.size++;
+ if (fields != null) {
+ StructObjectInspector soi = (StructObjectInspector) inspector;
+ for(int i=0; i < fields.length; ++i) {
+ setColumn(rowId, internalBatch.cols[i],
+ fields[i].getFieldObjectInspector(),
+ soi.getStructFieldData(row, fields[i]));
}
+ } else {
+ setColumn(rowId, internalBatch.cols[0], inspector, row);
}
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- writer.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
+ if (internalBatch.size == internalBatch.getMaxSize()) {
+ flushInternalBatch();
}
}
- private static class FloatTreeWriter extends TreeWriter {
- private final PositionedOutputStream stream;
- private final SerializationUtils utils;
+ @Override
+ public long writeIntermediateFooter() throws IOException {
+ flushInternalBatch();
+ return super.writeIntermediateFooter();
+ }
- FloatTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.stream = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.utils = new SerializationUtils();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- float val = ((FloatObjectInspector) inspector).get(obj);
- indexStatistics.updateDouble(val);
- if (createBloomFilter) {
- // floats are converted to doubles in column statistics and during SARG evaluation
- bloomFilter.addDouble(val);
- }
- utils.writeFloat(stream, val);
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- DoubleColumnVector vec = (DoubleColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- float value = (float) vec.vector[0];
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- bloomFilter.addDouble(value);
- }
- for(int i=0; i < length; ++i) {
- utils.writeFloat(stream, value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- float value = (float) vec.vector[i + offset];
- utils.writeFloat(stream, value);
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- bloomFilter.addDouble(value);
- }
- }
- }
- }
- }
-
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- stream.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- stream.getPosition(recorder);
- }
- }
-
- private static class DoubleTreeWriter extends TreeWriter {
- private final PositionedOutputStream stream;
- private final SerializationUtils utils;
-
- DoubleTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.stream = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.utils = new SerializationUtils();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- double val = ((DoubleObjectInspector) inspector).get(obj);
- indexStatistics.updateDouble(val);
- if (createBloomFilter) {
- bloomFilter.addDouble(val);
- }
- utils.writeDouble(stream, val);
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- DoubleColumnVector vec = (DoubleColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- double value = vec.vector[0];
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- bloomFilter.addDouble(value);
- }
- for(int i=0; i < length; ++i) {
- utils.writeDouble(stream, value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- double value = vec.vector[i + offset];
- utils.writeDouble(stream, value);
- indexStatistics.updateDouble(value);
- if (createBloomFilter) {
- bloomFilter.addDouble(value);
- }
- }
- }
- }
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- stream.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- stream.getPosition(recorder);
- }
- }
-
- private static abstract class StringBaseTreeWriter extends TreeWriter {
- private static final int INITIAL_DICTIONARY_SIZE = 4096;
- private final OutStream stringOutput;
- private final IntegerWriter lengthOutput;
- private final IntegerWriter rowOutput;
- protected final StringRedBlackTree dictionary =
- new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
- protected final DynamicIntArray rows = new DynamicIntArray();
- protected final PositionedOutputStream directStreamOutput;
- protected final IntegerWriter directLengthOutput;
- private final List<OrcProto.RowIndexEntry> savedRowIndex =
- new ArrayList<OrcProto.RowIndexEntry>();
- private final boolean buildIndex;
- private final List<Long> rowIndexValueCount = new ArrayList<Long>();
- // If the number of keys in a dictionary is greater than this fraction of
- //the total number of non-null rows, turn off dictionary encoding
- private final double dictionaryKeySizeThreshold;
- protected boolean useDictionaryEncoding = true;
- private boolean isDirectV2 = true;
- private boolean doneDictionaryCheck;
- private final boolean strideDictionaryCheck;
-
- StringBaseTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.isDirectV2 = isNewWriteFormat(writer);
- stringOutput = writer.createStream(id,
- OrcProto.Stream.Kind.DICTIONARY_DATA);
- lengthOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- rowOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), false, isDirectV2, writer);
- recordPosition(rowIndexPosition);
- rowIndexValueCount.add(0L);
- buildIndex = writer.buildIndex();
- directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- directLengthOutput = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- Configuration conf = writer.getConfiguration();
- dictionaryKeySizeThreshold =
- OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf);
- strideDictionaryCheck =
- OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf);
- doneDictionaryCheck = false;
- }
-
- /**
- * Method to retrieve text values from the value object, which can be overridden
- * by subclasses.
- * @param obj value
- * @return Text text value from obj
- */
- Text getTextValue(Object obj) {
- return ((StringObjectInspector) inspector).getPrimitiveWritableObject(obj);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- Text val = getTextValue(obj);
- if (useDictionaryEncoding) {
- rows.add(dictionary.add(val));
- } else {
- // write data and length
- directStreamOutput.write(val.getBytes(), 0, val.getLength());
- directLengthOutput.write(val.getLength());
- }
- indexStatistics.updateString(val);
- if (createBloomFilter) {
- bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
- }
- }
- }
-
- private boolean checkDictionaryEncoding() {
- if (!doneDictionaryCheck) {
- // Set the flag indicating whether or not to use dictionary encoding
- // based on whether or not the fraction of distinct keys over number of
- // non-null rows is less than the configured threshold
- float ratio = rows.size() > 0 ? (float) (dictionary.size()) / rows.size() : 0.0f;
- useDictionaryEncoding = !isDirectV2 || ratio <= dictionaryKeySizeThreshold;
- doneDictionaryCheck = true;
- }
- return useDictionaryEncoding;
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- // if rows in stripe is less than dictionaryCheckAfterRows, dictionary
- // checking would not have happened. So do it again here.
- checkDictionaryEncoding();
-
- if (useDictionaryEncoding) {
- flushDictionary();
- } else {
- // flushout any left over entries from dictionary
- if (rows.size() > 0) {
- flushDictionary();
- }
-
- // suppress the stream for every stripe if dictionary is disabled
- stringOutput.suppress();
- }
-
- // we need to build the rowindex before calling super, since it
- // writes it out.
- super.writeStripe(builder, requiredIndexEntries);
- stringOutput.flush();
- lengthOutput.flush();
- rowOutput.flush();
- directStreamOutput.flush();
- directLengthOutput.flush();
- // reset all of the fields to be ready for the next stripe.
- dictionary.clear();
- savedRowIndex.clear();
- rowIndexValueCount.clear();
- recordPosition(rowIndexPosition);
- rowIndexValueCount.add(0L);
-
- if (!useDictionaryEncoding) {
- // record the start positions of first index stride of next stripe i.e
- // beginning of the direct streams when dictionary is disabled
- recordDirectStreamPosition();
- }
- }
-
- private void flushDictionary() throws IOException {
- final int[] dumpOrder = new int[dictionary.size()];
-
- if (useDictionaryEncoding) {
- // Write the dictionary by traversing the red-black tree writing out
- // the bytes and lengths; and creating the map from the original order
- // to the final sorted order.
-
- dictionary.visit(new StringRedBlackTree.Visitor() {
- private int currentId = 0;
- @Override
- public void visit(StringRedBlackTree.VisitorContext context
- ) throws IOException {
- context.writeBytes(stringOutput);
- lengthOutput.write(context.getLength());
- dumpOrder[context.getOriginalPosition()] = currentId++;
- }
- });
- } else {
- // for direct encoding, we don't want the dictionary data stream
- stringOutput.suppress();
- }
- int length = rows.size();
- int rowIndexEntry = 0;
- OrcProto.RowIndex.Builder rowIndex = getRowIndex();
- Text text = new Text();
- // write the values translated into the dump order.
- for(int i = 0; i <= length; ++i) {
- // now that we are writing out the row values, we can finalize the
- // row index
- if (buildIndex) {
- while (i == rowIndexValueCount.get(rowIndexEntry) &&
- rowIndexEntry < savedRowIndex.size()) {
- OrcProto.RowIndexEntry.Builder base =
- savedRowIndex.get(rowIndexEntry++).toBuilder();
- if (useDictionaryEncoding) {
- rowOutput.getPosition(new RowIndexPositionRecorder(base));
- } else {
- PositionRecorder posn = new RowIndexPositionRecorder(base);
- directStreamOutput.getPosition(posn);
- directLengthOutput.getPosition(posn);
- }
- rowIndex.addEntry(base.build());
- }
- }
- if (i != length) {
- if (useDictionaryEncoding) {
- rowOutput.write(dumpOrder[rows.get(i)]);
- } else {
- dictionary.getText(text, rows.get(i));
- directStreamOutput.write(text.getBytes(), 0, text.getLength());
- directLengthOutput.write(text.getLength());
- }
- }
- }
- rows.clear();
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- // Returns the encoding used for the last call to writeStripe
- if (useDictionaryEncoding) {
- if(isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder().setKind(
- OrcProto.ColumnEncoding.Kind.DICTIONARY_V2).
- setDictionarySize(dictionary.size()).build();
- }
- return OrcProto.ColumnEncoding.newBuilder().setKind(
- OrcProto.ColumnEncoding.Kind.DICTIONARY).
- setDictionarySize(dictionary.size()).build();
- } else {
- if(isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder().setKind(
- OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder().setKind(
- OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
- }
-
- /**
- * This method doesn't call the super method, because unlike most of the
- * other TreeWriters, this one can't record the position in the streams
- * until the stripe is being flushed. Therefore it saves all of the entries
- * and augments them with the final information as the stripe is written.
- * @throws IOException
- */
- @Override
- void createRowIndexEntry() throws IOException {
- getStripeStatistics().merge(indexStatistics);
- OrcProto.RowIndexEntry.Builder rowIndexEntry = getRowIndexEntry();
- rowIndexEntry.setStatistics(indexStatistics.serialize());
- indexStatistics.reset();
- OrcProto.RowIndexEntry base = rowIndexEntry.build();
- savedRowIndex.add(base);
- rowIndexEntry.clear();
- addBloomFilterEntry();
- recordPosition(rowIndexPosition);
- rowIndexValueCount.add(Long.valueOf(rows.size()));
- if (strideDictionaryCheck) {
- checkDictionaryEncoding();
- }
- if (!useDictionaryEncoding) {
- if (rows.size() > 0) {
- flushDictionary();
- // just record the start positions of next index stride
- recordDirectStreamPosition();
- } else {
- // record the start positions of next index stride
- recordDirectStreamPosition();
- getRowIndex().addEntry(base);
- }
- }
- }
-
- private void recordDirectStreamPosition() throws IOException {
- directStreamOutput.getPosition(rowIndexPosition);
- directLengthOutput.getPosition(rowIndexPosition);
- }
-
- @Override
- long estimateMemory() {
- return rows.getSizeInBytes() + dictionary.getSizeInBytes();
- }
- }
-
- private static class StringTreeWriter extends StringBaseTreeWriter {
- StringTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- if (useDictionaryEncoding) {
- int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
- for(int i=0; i < length; ++i) {
- rows.add(id);
- }
- } else {
- for(int i=0; i < length; ++i) {
- directStreamOutput.write(vec.vector[0], vec.start[0],
- vec.length[0]);
- directLengthOutput.write(vec.length[0]);
- }
- }
- indexStatistics.updateString(vec.vector[0], vec.start[0],
- vec.length[0], length);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- if (useDictionaryEncoding) {
- rows.add(dictionary.add(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]));
- } else {
- directStreamOutput.write(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- directLengthOutput.write(vec.length[offset + i]);
- }
- indexStatistics.updateString(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i], 1);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- }
- }
- }
- }
- }
- }
-
- /**
- * Under the covers, char is written to ORC the same way as string.
- */
- private static class CharTreeWriter extends StringBaseTreeWriter {
- private final int itemLength;
- private final byte[] padding;
-
- CharTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- itemLength = schema.getMaxLength();
- padding = new byte[itemLength];
- }
-
- /**
- * Override base class implementation to support char values.
- */
- @Override
- Text getTextValue(Object obj) {
- return (((HiveCharObjectInspector) inspector)
- .getPrimitiveWritableObject(obj)).getTextValue();
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- byte[] ptr;
- int ptrOffset;
- if (vec.length[0] >= itemLength) {
- ptr = vec.vector[0];
- ptrOffset = vec.start[0];
- } else {
- ptr = padding;
- ptrOffset = 0;
- System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
- vec.length[0]);
- Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
- }
- if (useDictionaryEncoding) {
- int id = dictionary.add(ptr, ptrOffset, itemLength);
- for(int i=0; i < length; ++i) {
- rows.add(id);
- }
- } else {
- for(int i=0; i < length; ++i) {
- directStreamOutput.write(ptr, ptrOffset, itemLength);
- directLengthOutput.write(itemLength);
- }
- }
- indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
- if (createBloomFilter) {
- bloomFilter.addBytes(ptr, ptrOffset, itemLength);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- byte[] ptr;
- int ptrOffset;
- if (vec.length[offset + i] >= itemLength) {
- ptr = vec.vector[offset + i];
- ptrOffset = vec.start[offset + i];
- } else {
- // it is the wrong length, so copy it
- ptr = padding;
- ptrOffset = 0;
- System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
- ptr, 0, vec.length[offset + i]);
- Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
- }
- if (useDictionaryEncoding) {
- rows.add(dictionary.add(ptr, ptrOffset, itemLength));
- } else {
- directStreamOutput.write(ptr, ptrOffset, itemLength);
- directLengthOutput.write(itemLength);
- }
- indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
- if (createBloomFilter) {
- bloomFilter.addBytes(ptr, ptrOffset, itemLength);
- }
- }
- }
- }
- }
- }
-
- /**
- * Under the covers, varchar is written to ORC the same way as string.
- */
- private static class VarcharTreeWriter extends StringBaseTreeWriter {
- private final int maxLength;
-
- VarcharTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- maxLength = schema.getMaxLength();
- }
-
- /**
- * Override base class implementation to support varchar values.
- */
- @Override
- Text getTextValue(Object obj) {
- return (((HiveVarcharObjectInspector) inspector)
- .getPrimitiveWritableObject(obj)).getTextValue();
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- int itemLength = Math.min(vec.length[0], maxLength);
- if (useDictionaryEncoding) {
- int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
- for(int i=0; i < length; ++i) {
- rows.add(id);
- }
- } else {
- for(int i=0; i < length; ++i) {
- directStreamOutput.write(vec.vector[0], vec.start[0],
- itemLength);
- directLengthOutput.write(itemLength);
- }
- }
- indexStatistics.updateString(vec.vector[0], vec.start[0],
- itemLength, length);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- int itemLength = Math.min(vec.length[offset + i], maxLength);
- if (useDictionaryEncoding) {
- rows.add(dictionary.add(vec.vector[offset + i],
- vec.start[offset + i], itemLength));
- } else {
- directStreamOutput.write(vec.vector[offset + i],
- vec.start[offset + i], itemLength);
- directLengthOutput.write(itemLength);
- }
- indexStatistics.updateString(vec.vector[offset + i],
- vec.start[offset + i], itemLength, 1);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[offset + i],
- vec.start[offset + i], itemLength);
- }
- }
- }
- }
- }
- }
-
- private static class BinaryTreeWriter extends TreeWriter {
- private final PositionedOutputStream stream;
- private final IntegerWriter length;
- private boolean isDirectV2 = true;
-
- BinaryTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.stream = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.length = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
- recordPosition(rowIndexPosition);
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- if (isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- BytesWritable val =
- ((BinaryObjectInspector) inspector).getPrimitiveWritableObject(obj);
- stream.write(val.getBytes(), 0, val.getLength());
- length.write(val.getLength());
- indexStatistics.updateBinary(val);
- if (createBloomFilter) {
- bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
- }
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- BytesColumnVector vec = (BytesColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- for(int i=0; i < length; ++i) {
- stream.write(vec.vector[0], vec.start[0],
- vec.length[0]);
- this.length.write(vec.length[0]);
- }
- indexStatistics.updateBinary(vec.vector[0], vec.start[0],
- vec.length[0], length);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- stream.write(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- this.length.write(vec.length[offset + i]);
- indexStatistics.updateBinary(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i], 1);
- if (createBloomFilter) {
- bloomFilter.addBytes(vec.vector[offset + i],
- vec.start[offset + i], vec.length[offset + i]);
- }
- }
- }
- }
- }
-
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- stream.flush();
- length.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- stream.getPosition(recorder);
- length.getPosition(recorder);
- }
- }
-
- static final int MILLIS_PER_SECOND = 1000;
- static final int NANOS_PER_SECOND = 1000000000;
- static final int MILLIS_PER_NANO = 1000000;
- static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
-
- private static class TimestampTreeWriter extends TreeWriter {
- private final IntegerWriter seconds;
- private final IntegerWriter nanos;
- private final boolean isDirectV2;
- private final long base_timestamp;
-
- TimestampTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.seconds = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.DATA), true, isDirectV2, writer);
- this.nanos = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer);
- recordPosition(rowIndexPosition);
- // for unit tests to set different time zones
- this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND;
- writer.useWriterTimeZone(true);
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- if (isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- Timestamp val =
- ((TimestampObjectInspector) inspector).
- getPrimitiveJavaObject(obj);
- indexStatistics.updateTimestamp(val);
- seconds.write((val.getTime() / MILLIS_PER_SECOND) - base_timestamp);
- nanos.write(formatNanos(val.getNanos()));
- if (createBloomFilter) {
- bloomFilter.addLong(val.getTime());
- }
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- long value = vec.vector[0];
- long valueMillis = value / MILLIS_PER_NANO;
- indexStatistics.updateTimestamp(valueMillis);
- if (createBloomFilter) {
- bloomFilter.addLong(valueMillis);
- }
- final long secs = value / NANOS_PER_SECOND - base_timestamp;
- final long nano = formatNanos((int) (value % NANOS_PER_SECOND));
- for(int i=0; i < length; ++i) {
- seconds.write(secs);
- nanos.write(nano);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- long value = vec.vector[i + offset];
- long valueMillis = value / MILLIS_PER_NANO;
- long valueSecs = value /NANOS_PER_SECOND - base_timestamp;
- int valueNanos = (int) (value % NANOS_PER_SECOND);
- if (valueNanos < 0) {
- valueNanos += NANOS_PER_SECOND;
- }
- seconds.write(valueSecs);
- nanos.write(formatNanos(valueNanos));
- indexStatistics.updateTimestamp(valueMillis);
- if (createBloomFilter) {
- bloomFilter.addLong(valueMillis);
- }
- }
- }
- }
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- seconds.flush();
- nanos.flush();
- recordPosition(rowIndexPosition);
- }
-
- private static long formatNanos(int nanos) {
- if (nanos == 0) {
- return 0;
- } else if (nanos % 100 != 0) {
- return ((long) nanos) << 3;
- } else {
- nanos /= 100;
- int trailingZeros = 1;
- while (nanos % 10 == 0 && trailingZeros < 7) {
- nanos /= 10;
- trailingZeros += 1;
- }
- return ((long) nanos) << 3 | trailingZeros;
- }
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- seconds.getPosition(recorder);
- nanos.getPosition(recorder);
- }
- }
-
- private static class DateTreeWriter extends TreeWriter {
- private final IntegerWriter writer;
- private final boolean isDirectV2;
-
- DateTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- OutStream out = writer.createStream(id,
- OrcProto.Stream.Kind.DATA);
- this.isDirectV2 = isNewWriteFormat(writer);
- this.writer = createIntegerWriter(out, true, isDirectV2, writer);
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- // Using the Writable here as it's used directly for writing as well as for stats.
- DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj);
- indexStatistics.updateDate(val);
- writer.write(val.getDays());
- if (createBloomFilter) {
- bloomFilter.addLong(val.getDays());
- }
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- LongColumnVector vec = (LongColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- int value = (int) vec.vector[0];
- indexStatistics.updateDate(value);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- for(int i=0; i < length; ++i) {
- writer.write(value);
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- int value = (int) vec.vector[i + offset];
- writer.write(value);
- indexStatistics.updateDate(value);
- if (createBloomFilter) {
- bloomFilter.addLong(value);
- }
- }
- }
- }
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- writer.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- writer.getPosition(recorder);
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- if (isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
- }
-
- private static class DecimalTreeWriter extends TreeWriter {
- private final PositionedOutputStream valueStream;
- private final IntegerWriter scaleStream;
- private final boolean isDirectV2;
-
- DecimalTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- this.isDirectV2 = isNewWriteFormat(writer);
- valueStream = writer.createStream(id, OrcProto.Stream.Kind.DATA);
- this.scaleStream = createIntegerWriter(writer.createStream(id,
- OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer);
- recordPosition(rowIndexPosition);
- }
-
- @Override
- OrcProto.ColumnEncoding getEncoding() {
- if (isDirectV2) {
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT_V2).build();
- }
- return OrcProto.ColumnEncoding.newBuilder()
- .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build();
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- HiveDecimal decimal = ((HiveDecimalObjectInspector) inspector).
- getPrimitiveJavaObject(obj);
- if (decimal == null) {
- return;
- }
- SerializationUtils.writeBigInteger(valueStream,
- decimal.unscaledValue());
- scaleStream.write(decimal.scale());
- indexStatistics.updateDecimal(decimal);
- if (createBloomFilter) {
- bloomFilter.addString(decimal.toString());
- }
- }
- }
-
- @Override
- void writeBatch(ColumnVector vector, int offset,
- int length) throws IOException {
- super.writeBatch(vector, offset, length);
- DecimalColumnVector vec = (DecimalColumnVector) vector;
- if (vector.isRepeating) {
- if (vector.noNulls || !vector.isNull[0]) {
- HiveDecimal value = vec.vector[0].getHiveDecimal();
- indexStatistics.updateDecimal(value);
- if (createBloomFilter) {
- bloomFilter.addString(value.toString());
- }
- for(int i=0; i < length; ++i) {
- SerializationUtils.writeBigInteger(valueStream,
- value.unscaledValue());
- scaleStream.write(value.scale());
- }
- }
- } else {
- for(int i=0; i < length; ++i) {
- if (vec.noNulls || !vec.isNull[i + offset]) {
- HiveDecimal value = vec.vector[i + offset].getHiveDecimal();
- SerializationUtils.writeBigInteger(valueStream,
- value.unscaledValue());
- scaleStream.write(value.scale());
- indexStatistics.updateDecimal(value);
- if (createBloomFilter) {
- bloomFilter.addString(value.toString());
- }
- }
- }
- }
- }
-
- @Override
- void writeStripe(OrcProto.StripeFooter.Builder builder,
- int requiredIndexEntries) throws IOException {
- super.writeStripe(builder, requiredIndexEntries);
- valueStream.flush();
- scaleStream.flush();
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void recordPosition(PositionRecorder recorder) throws IOException {
- super.recordPosition(recorder);
- valueStream.getPosition(recorder);
- scaleStream.getPosition(recorder);
- }
- }
-
- private static class StructTreeWriter extends TreeWriter {
- private final List<? extends StructField> fields;
- StructTreeWriter(int columnId,
- ObjectInspector inspector,
- TypeDescription schema,
- StreamFactory writer,
- boolean nullable) throws IOException {
- super(columnId, inspector, schema, writer, nullable);
- List<TypeDescription> children = schema.getChildren();
- if (inspector != null) {
- StructObjectInspector structObjectInspector =
- (StructObjectInspector) inspector;
- fields = structObjectInspector.getAllStructFieldRefs();
- } else {
- fields = null;
- }
- childrenWriters = new TreeWriter[children.size()];
- for(int i=0; i < childrenWriters.length; ++i) {
- ObjectInspector childOI;
- if (fields != null && i < fields.size()) {
- childOI = fields.get(i).getFieldObjectInspector();
- } else {
- childOI = null;
- }
- childrenWriters[i] = createTreeWriter(
- childOI, children.get(i), writer,
- true);
- }
- recordPosition(rowIndexPosition);
- }
-
- @Override
- void write(Object obj) throws IOException {
- super.write(obj);
- if (obj != null) {
- StructObjectInspector insp = (StructObjectInspecto
<TRUNCATED>