You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/18 23:40:49 UTC
[07/34] hive git commit: HIVE-12054. Create vectorized ORC write
method. (omalley reviewed by prasanthj)
HIVE-12054. Create vectorized ORC write method. (omalley reviewed by prasanthj)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f5f9f30d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f5f9f30d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f5f9f30d
Branch: refs/heads/master-fixed
Commit: f5f9f30dd6ee5cd11e1787f57bff076977bfda19
Parents: 6c768fc
Author: Owen O'Malley <om...@apache.org>
Authored: Wed Nov 11 15:24:03 2015 -0800
Committer: Owen O'Malley <om...@apache.org>
Committed: Wed Nov 18 13:56:14 2015 -0800
----------------------------------------------------------------------
.../apache/hive/common/util/BloomFilter.java | 18 +-
.../org/apache/hive/common/util/Murmur3.java | 107 +-
.../apache/hive/common/util/TestMurmur3.java | 45 +-
.../hive/ql/io/orc/ColumnStatisticsImpl.java | 79 +-
.../hadoop/hive/ql/io/orc/MemoryManager.java | 6 +-
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 4 +
.../hive/ql/io/orc/StringRedBlackTree.java | 5 +
.../hadoop/hive/ql/io/orc/TypeDescription.java | 74 +
.../apache/hadoop/hive/ql/io/orc/Writer.java | 7 +
.../hadoop/hive/ql/io/orc/WriterImpl.java | 852 +++++-
.../hive/ql/io/orc/TestColumnStatistics.java | 16 +-
.../hive/ql/io/orc/TestMemoryManager.java | 2 +-
.../hadoop/hive/ql/io/orc/TestOrcFile.java | 5 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 12 +-
.../hive/ql/io/orc/TestVectorOrcFile.java | 2744 ++++++++++++++++++
.../hive/ql/exec/vector/BytesColumnVector.java | 13 +
.../hive/ql/exec/vector/ColumnVector.java | 37 +-
.../hive/ql/exec/vector/StructColumnVector.java | 8 +
.../hive/ql/exec/vector/UnionColumnVector.java | 8 +
.../hive/ql/exec/vector/VectorizedRowBatch.java | 10 +
20 files changed, 3917 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/common/src/java/org/apache/hive/common/util/BloomFilter.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/BloomFilter.java b/common/src/java/org/apache/hive/common/util/BloomFilter.java
index d894241..bb0b8f2 100644
--- a/common/src/java/org/apache/hive/common/util/BloomFilter.java
+++ b/common/src/java/org/apache/hive/common/util/BloomFilter.java
@@ -89,20 +89,21 @@ public class BloomFilter {
public void add(byte[] val) {
if (val == null) {
- addBytes(val, -1);
+ addBytes(val, -1, -1);
} else {
- addBytes(val, val.length);
+ addBytes(val, 0, val.length);
}
}
- public void addBytes(byte[] val, int length) {
+ public void addBytes(byte[] val, int offset, int length) {
// We use the trick mentioned in "Less Hashing, Same Performance: Building a Better Bloom Filter"
// by Kirsch et.al. From abstract 'only two hash functions are necessary to effectively
// implement a Bloom filter without any loss in the asymptotic false positive probability'
// Lets split up 64-bit hashcode into two 32-bit hash codes and employ the technique mentioned
// in the above paper
- long hash64 = val == null ? Murmur3.NULL_HASHCODE : Murmur3.hash64(val, length);
+ long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+ Murmur3.hash64(val, offset, length);
addHash(hash64);
}
@@ -139,13 +140,14 @@ public class BloomFilter {
public boolean test(byte[] val) {
if (val == null) {
- return testBytes(val, -1);
+ return testBytes(val, -1, -1);
}
- return testBytes(val, val.length);
+ return testBytes(val, 0, val.length);
}
- public boolean testBytes(byte[] val, int length) {
- long hash64 = val == null ? Murmur3.NULL_HASHCODE : Murmur3.hash64(val, length);
+ public boolean testBytes(byte[] val, int offset, int length) {
+ long hash64 = val == null ? Murmur3.NULL_HASHCODE :
+ Murmur3.hash64(val, offset, length);
return testHash(hash64);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/common/src/java/org/apache/hive/common/util/Murmur3.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hive/common/util/Murmur3.java b/common/src/java/org/apache/hive/common/util/Murmur3.java
index 087407a..88c3514 100644
--- a/common/src/java/org/apache/hive/common/util/Murmur3.java
+++ b/common/src/java/org/apache/hive/common/util/Murmur3.java
@@ -128,11 +128,11 @@ public class Murmur3 {
* @return - hashcode
*/
public static long hash64(byte[] data) {
- return hash64(data, data.length, DEFAULT_SEED);
+ return hash64(data, 0, data.length, DEFAULT_SEED);
}
- public static long hash64(byte[] data, int length) {
- return hash64(data, length, DEFAULT_SEED);
+ public static long hash64(byte[] data, int offset, int length) {
+ return hash64(data, offset, length, DEFAULT_SEED);
}
/**
@@ -143,21 +143,21 @@ public class Murmur3 {
* @param seed - seed. (default is 0)
* @return - hashcode
*/
- public static long hash64(byte[] data, int length, int seed) {
+ public static long hash64(byte[] data, int offset, int length, int seed) {
long hash = seed;
final int nblocks = length >> 3;
// body
for (int i = 0; i < nblocks; i++) {
final int i8 = i << 3;
- long k = ((long) data[i8] & 0xff)
- | (((long) data[i8 + 1] & 0xff) << 8)
- | (((long) data[i8 + 2] & 0xff) << 16)
- | (((long) data[i8 + 3] & 0xff) << 24)
- | (((long) data[i8 + 4] & 0xff) << 32)
- | (((long) data[i8 + 5] & 0xff) << 40)
- | (((long) data[i8 + 6] & 0xff) << 48)
- | (((long) data[i8 + 7] & 0xff) << 56);
+ long k = ((long) data[offset + i8] & 0xff)
+ | (((long) data[offset + i8 + 1] & 0xff) << 8)
+ | (((long) data[offset + i8 + 2] & 0xff) << 16)
+ | (((long) data[offset + i8 + 3] & 0xff) << 24)
+ | (((long) data[offset + i8 + 4] & 0xff) << 32)
+ | (((long) data[offset + i8 + 5] & 0xff) << 40)
+ | (((long) data[offset + i8 + 6] & 0xff) << 48)
+ | (((long) data[offset + i8 + 7] & 0xff) << 56);
// mix functions
k *= C1;
@@ -172,19 +172,19 @@ public class Murmur3 {
int tailStart = nblocks << 3;
switch (length - tailStart) {
case 7:
- k1 ^= ((long) data[tailStart + 6] & 0xff) << 48;
+ k1 ^= ((long) data[offset + tailStart + 6] & 0xff) << 48;
case 6:
- k1 ^= ((long) data[tailStart + 5] & 0xff) << 40;
+ k1 ^= ((long) data[offset + tailStart + 5] & 0xff) << 40;
case 5:
- k1 ^= ((long) data[tailStart + 4] & 0xff) << 32;
+ k1 ^= ((long) data[offset + tailStart + 4] & 0xff) << 32;
case 4:
- k1 ^= ((long) data[tailStart + 3] & 0xff) << 24;
+ k1 ^= ((long) data[offset + tailStart + 3] & 0xff) << 24;
case 3:
- k1 ^= ((long) data[tailStart + 2] & 0xff) << 16;
+ k1 ^= ((long) data[offset + tailStart + 2] & 0xff) << 16;
case 2:
- k1 ^= ((long) data[tailStart + 1] & 0xff) << 8;
+ k1 ^= ((long) data[offset + tailStart + 1] & 0xff) << 8;
case 1:
- k1 ^= ((long) data[tailStart] & 0xff);
+ k1 ^= ((long) data[offset + tailStart] & 0xff);
k1 *= C1;
k1 = Long.rotateLeft(k1, R1);
k1 *= C2;
@@ -205,18 +205,19 @@ public class Murmur3 {
* @return - hashcode (2 longs)
*/
public static long[] hash128(byte[] data) {
- return hash128(data, data.length, DEFAULT_SEED);
+ return hash128(data, 0, data.length, DEFAULT_SEED);
}
/**
* Murmur3 128-bit variant.
*
* @param data - input byte array
+ * @param offset - the first element of array
* @param length - length of array
* @param seed - seed. (default is 0)
* @return - hashcode (2 longs)
*/
- public static long[] hash128(byte[] data, int length, int seed) {
+ public static long[] hash128(byte[] data, int offset, int length, int seed) {
long h1 = seed;
long h2 = seed;
final int nblocks = length >> 4;
@@ -224,23 +225,23 @@ public class Murmur3 {
// body
for (int i = 0; i < nblocks; i++) {
final int i16 = i << 4;
- long k1 = ((long) data[i16] & 0xff)
- | (((long) data[i16 + 1] & 0xff) << 8)
- | (((long) data[i16 + 2] & 0xff) << 16)
- | (((long) data[i16 + 3] & 0xff) << 24)
- | (((long) data[i16 + 4] & 0xff) << 32)
- | (((long) data[i16 + 5] & 0xff) << 40)
- | (((long) data[i16 + 6] & 0xff) << 48)
- | (((long) data[i16 + 7] & 0xff) << 56);
-
- long k2 = ((long) data[i16 + 8] & 0xff)
- | (((long) data[i16 + 9] & 0xff) << 8)
- | (((long) data[i16 + 10] & 0xff) << 16)
- | (((long) data[i16 + 11] & 0xff) << 24)
- | (((long) data[i16 + 12] & 0xff) << 32)
- | (((long) data[i16 + 13] & 0xff) << 40)
- | (((long) data[i16 + 14] & 0xff) << 48)
- | (((long) data[i16 + 15] & 0xff) << 56);
+ long k1 = ((long) data[offset + i16] & 0xff)
+ | (((long) data[offset + i16 + 1] & 0xff) << 8)
+ | (((long) data[offset + i16 + 2] & 0xff) << 16)
+ | (((long) data[offset + i16 + 3] & 0xff) << 24)
+ | (((long) data[offset + i16 + 4] & 0xff) << 32)
+ | (((long) data[offset + i16 + 5] & 0xff) << 40)
+ | (((long) data[offset + i16 + 6] & 0xff) << 48)
+ | (((long) data[offset + i16 + 7] & 0xff) << 56);
+
+ long k2 = ((long) data[offset + i16 + 8] & 0xff)
+ | (((long) data[offset + i16 + 9] & 0xff) << 8)
+ | (((long) data[offset + i16 + 10] & 0xff) << 16)
+ | (((long) data[offset + i16 + 11] & 0xff) << 24)
+ | (((long) data[offset + i16 + 12] & 0xff) << 32)
+ | (((long) data[offset + i16 + 13] & 0xff) << 40)
+ | (((long) data[offset + i16 + 14] & 0xff) << 48)
+ | (((long) data[offset + i16 + 15] & 0xff) << 56);
// mix functions for k1
k1 *= C1;
@@ -267,40 +268,40 @@ public class Murmur3 {
int tailStart = nblocks << 4;
switch (length - tailStart) {
case 15:
- k2 ^= (long) (data[tailStart + 14] & 0xff) << 48;
+ k2 ^= (long) (data[offset + tailStart + 14] & 0xff) << 48;
case 14:
- k2 ^= (long) (data[tailStart + 13] & 0xff) << 40;
+ k2 ^= (long) (data[offset + tailStart + 13] & 0xff) << 40;
case 13:
- k2 ^= (long) (data[tailStart + 12] & 0xff) << 32;
+ k2 ^= (long) (data[offset + tailStart + 12] & 0xff) << 32;
case 12:
- k2 ^= (long) (data[tailStart + 11] & 0xff) << 24;
+ k2 ^= (long) (data[offset + tailStart + 11] & 0xff) << 24;
case 11:
- k2 ^= (long) (data[tailStart + 10] & 0xff) << 16;
+ k2 ^= (long) (data[offset + tailStart + 10] & 0xff) << 16;
case 10:
- k2 ^= (long) (data[tailStart + 9] & 0xff) << 8;
+ k2 ^= (long) (data[offset + tailStart + 9] & 0xff) << 8;
case 9:
- k2 ^= (long) (data[tailStart + 8] & 0xff);
+ k2 ^= (long) (data[offset + tailStart + 8] & 0xff);
k2 *= C2;
k2 = Long.rotateLeft(k2, R3);
k2 *= C1;
h2 ^= k2;
case 8:
- k1 ^= (long) (data[tailStart + 7] & 0xff) << 56;
+ k1 ^= (long) (data[offset + tailStart + 7] & 0xff) << 56;
case 7:
- k1 ^= (long) (data[tailStart + 6] & 0xff) << 48;
+ k1 ^= (long) (data[offset + tailStart + 6] & 0xff) << 48;
case 6:
- k1 ^= (long) (data[tailStart + 5] & 0xff) << 40;
+ k1 ^= (long) (data[offset + tailStart + 5] & 0xff) << 40;
case 5:
- k1 ^= (long) (data[tailStart + 4] & 0xff) << 32;
+ k1 ^= (long) (data[offset + tailStart + 4] & 0xff) << 32;
case 4:
- k1 ^= (long) (data[tailStart + 3] & 0xff) << 24;
+ k1 ^= (long) (data[offset + tailStart + 3] & 0xff) << 24;
case 3:
- k1 ^= (long) (data[tailStart + 2] & 0xff) << 16;
+ k1 ^= (long) (data[offset + tailStart + 2] & 0xff) << 16;
case 2:
- k1 ^= (long) (data[tailStart + 1] & 0xff) << 8;
+ k1 ^= (long) (data[offset + tailStart + 1] & 0xff) << 8;
case 1:
- k1 ^= (long) (data[tailStart] & 0xff);
+ k1 ^= (long) (data[offset + tailStart] & 0xff);
k1 *= C1;
k1 = Long.rotateLeft(k1, R1);
k1 *= C2;
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/common/src/test/org/apache/hive/common/util/TestMurmur3.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hive/common/util/TestMurmur3.java b/common/src/test/org/apache/hive/common/util/TestMurmur3.java
index e506f71..5facc7c 100644
--- a/common/src/test/org/apache/hive/common/util/TestMurmur3.java
+++ b/common/src/test/org/apache/hive/common/util/TestMurmur3.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.util.Arrays;
import java.util.Random;
/**
@@ -102,7 +103,7 @@ public class TestMurmur3 {
buf.flip();
long gl1 = buf.getLong();
long gl2 = buf.getLong(8);
- long[] hc = Murmur3.hash128(key.getBytes(), key.getBytes().length, seed);
+ long[] hc = Murmur3.hash128(key.getBytes(), 0, key.getBytes().length, seed);
long m1 = hc[0];
long m2 = hc[1];
assertEquals(gl1, m1);
@@ -114,11 +115,39 @@ public class TestMurmur3 {
buf.flip();
gl1 = buf.getLong();
gl2 = buf.getLong(8);
- hc = Murmur3.hash128(key.getBytes(), key.getBytes().length, seed);
+ byte[] keyBytes = key.getBytes();
+ hc = Murmur3.hash128(keyBytes, 0, keyBytes.length, seed);
m1 = hc[0];
m2 = hc[1];
assertEquals(gl1, m1);
assertEquals(gl2, m2);
+
+ byte[] offsetKeyBytes = new byte[keyBytes.length + 35];
+ Arrays.fill(offsetKeyBytes, (byte) -1);
+ System.arraycopy(keyBytes, 0, offsetKeyBytes, 35, keyBytes.length);
+ hc = Murmur3.hash128(offsetKeyBytes, 35, keyBytes.length, seed);
+ assertEquals(gl1, hc[0]);
+ assertEquals(gl2, hc[1]);
+ }
+
+ @Test
+ public void testHashCodeM3_64() {
+ byte[] origin = ("It was the best of times, it was the worst of times," +
+ " it was the age of wisdom, it was the age of foolishness," +
+ " it was the epoch of belief, it was the epoch of incredulity," +
+ " it was the season of Light, it was the season of Darkness," +
+ " it was the spring of hope, it was the winter of despair," +
+ " we had everything before us, we had nothing before us," +
+ " we were all going direct to Heaven," +
+ " we were all going direct the other way.").getBytes();
+ long hash = Murmur3.hash64(origin, 0, origin.length);
+ assertEquals(305830725663368540L, hash);
+
+ byte[] originOffset = new byte[origin.length + 150];
+ Arrays.fill(originOffset, (byte) 123);
+ System.arraycopy(origin, 0, originOffset, 150, origin.length);
+ hash = Murmur3.hash64(originOffset, 150, origin.length);
+ assertEquals(305830725663368540L, hash);
}
@Test
@@ -135,11 +164,17 @@ public class TestMurmur3 {
buf.flip();
long gl1 = buf.getLong();
long gl2 = buf.getLong(8);
- long[] hc = Murmur3.hash128(data, data.length, seed);
+ long[] hc = Murmur3.hash128(data, 0, data.length, seed);
long m1 = hc[0];
long m2 = hc[1];
assertEquals(gl1, m1);
assertEquals(gl2, m2);
+
+ byte[] offsetData = new byte[data.length + 50];
+ System.arraycopy(data, 0, offsetData, 50, data.length);
+ hc = Murmur3.hash128(offsetData, 50, data.length, seed);
+ assertEquals(gl1, hc[0]);
+ assertEquals(gl2, hc[1]);
}
}
@@ -157,7 +192,7 @@ public class TestMurmur3 {
buf.flip();
long gl1 = buf.getLong();
long gl2 = buf.getLong(8);
- long[] hc = Murmur3.hash128(data, data.length, seed);
+ long[] hc = Murmur3.hash128(data, 0, data.length, seed);
long m1 = hc[0];
long m2 = hc[1];
assertEquals(gl1, m1);
@@ -179,7 +214,7 @@ public class TestMurmur3 {
buf.flip();
long gl1 = buf.getLong();
long gl2 = buf.getLong(8);
- long[] hc = Murmur3.hash128(data, data.length, seed);
+ long[] hc = Murmur3.hash128(data, 0, data.length, seed);
long m1 = hc[0];
long m2 = hc[1];
assertEquals(gl1, m1);
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
index f39d3e2..bcca9de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
class ColumnStatisticsImpl implements ColumnStatistics {
@@ -47,9 +48,9 @@ class ColumnStatisticsImpl implements ColumnStatistics {
}
@Override
- void updateBoolean(boolean value) {
+ void updateBoolean(boolean value, int repetitions) {
if (value) {
- trueCount += 1;
+ trueCount += repetitions;
}
}
@@ -132,7 +133,7 @@ class ColumnStatisticsImpl implements ColumnStatistics {
}
@Override
- void updateInteger(long value) {
+ void updateInteger(long value, int repetitions) {
if (!hasMinimum) {
hasMinimum = true;
minimum = value;
@@ -144,7 +145,7 @@ class ColumnStatisticsImpl implements ColumnStatistics {
}
if (!overflow) {
boolean wasPositive = sum >= 0;
- sum += value;
+ sum += value * repetitions;
if ((value >= 0) == wasPositive) {
overflow = (sum >= 0) != wasPositive;
}
@@ -398,6 +399,23 @@ class ColumnStatisticsImpl implements ColumnStatistics {
}
@Override
+ void updateString(byte[] bytes, int offset, int length, int repetitions) {
+ if (minimum == null) {
+ maximum = minimum = new Text();
+ maximum.set(bytes, offset, length);
+ } else if (WritableComparator.compareBytes(minimum.getBytes(), 0,
+ minimum.getLength(), bytes, offset, length) > 0) {
+ minimum = new Text();
+ minimum.set(bytes, offset, length);
+ } else if (WritableComparator.compareBytes(maximum.getBytes(), 0,
+ maximum.getLength(), bytes, offset, length) < 0) {
+ maximum = new Text();
+ maximum.set(bytes, offset, length);
+ }
+ sum += length * repetitions;
+ }
+
+ @Override
void merge(ColumnStatisticsImpl other) {
if (other instanceof StringStatisticsImpl) {
StringStatisticsImpl str = (StringStatisticsImpl) other;
@@ -498,6 +516,11 @@ class ColumnStatisticsImpl implements ColumnStatistics {
}
@Override
+ void updateBinary(byte[] bytes, int offset, int length, int repetitions) {
+ sum += length * repetitions;
+ }
+
+ @Override
void merge(ColumnStatisticsImpl other) {
if (other instanceof BinaryColumnStatistics) {
BinaryStatisticsImpl bin = (BinaryStatisticsImpl) other;
@@ -700,6 +723,18 @@ class ColumnStatisticsImpl implements ColumnStatistics {
}
@Override
+ void updateDate(int value) {
+ if (minimum == null) {
+ minimum = value;
+ maximum = value;
+ } else if (minimum > value) {
+ minimum = value;
+ } else if (maximum < value) {
+ maximum = value;
+ }
+ }
+
+ @Override
void merge(ColumnStatisticsImpl other) {
if (other instanceof DateStatisticsImpl) {
DateStatisticsImpl dateStats = (DateStatisticsImpl) other;
@@ -809,6 +844,18 @@ class ColumnStatisticsImpl implements ColumnStatistics {
}
@Override
+ void updateTimestamp(long value) {
+ if (minimum == null) {
+ minimum = value;
+ maximum = value;
+ } else if (minimum > value) {
+ minimum = value;
+ } else if (maximum < value) {
+ maximum = value;
+ }
+ }
+
+ @Override
void merge(ColumnStatisticsImpl other) {
if (other instanceof TimestampStatisticsImpl) {
TimestampStatisticsImpl timestampStats = (TimestampStatisticsImpl) other;
@@ -889,15 +936,19 @@ class ColumnStatisticsImpl implements ColumnStatistics {
count += 1;
}
+ void increment(int count) {
+ this.count += count;
+ }
+
void setNull() {
hasNull = true;
}
- void updateBoolean(boolean value) {
+ void updateBoolean(boolean value, int repetitions) {
throw new UnsupportedOperationException("Can't update boolean");
}
- void updateInteger(long value) {
+ void updateInteger(long value, int repetitions) {
throw new UnsupportedOperationException("Can't update integer");
}
@@ -909,10 +960,18 @@ class ColumnStatisticsImpl implements ColumnStatistics {
throw new UnsupportedOperationException("Can't update string");
}
+ void updateString(byte[] bytes, int offset, int length, int repetitions) {
+ throw new UnsupportedOperationException("Can't update string");
+ }
+
void updateBinary(BytesWritable value) {
throw new UnsupportedOperationException("Can't update binary");
}
+ void updateBinary(byte[] bytes, int offset, int length, int repetitions) {
+ throw new UnsupportedOperationException("Can't update string");
+ }
+
void updateDecimal(HiveDecimal value) {
throw new UnsupportedOperationException("Can't update decimal");
}
@@ -921,10 +980,18 @@ class ColumnStatisticsImpl implements ColumnStatistics {
throw new UnsupportedOperationException("Can't update date");
}
+ void updateDate(int value) {
+ throw new UnsupportedOperationException("Can't update date");
+ }
+
void updateTimestamp(Timestamp value) {
throw new UnsupportedOperationException("Can't update timestamp");
}
+ void updateTimestamp(long value) {
+ throw new UnsupportedOperationException("Can't update timestamp");
+ }
+
boolean isStatsExists() {
return (count > 0 || hasNull == true);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
index 4d5f735..bb35b13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java
@@ -172,10 +172,12 @@ class MemoryManager {
/**
* Give the memory manager an opportunity for doing a memory check.
+ * @param rows number of rows added
* @throws IOException
*/
- void addedRow() throws IOException {
- if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
+ void addedRow(int rows) throws IOException {
+ rowsAddedSinceCheck += rows;
+ if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) {
notifyWriters();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 04b9eaf..84d627a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -1169,6 +1169,10 @@ public class RecordReaderImpl implements RecordReader {
return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
}
+ MetadataReader getMetadataReader() {
+ return metadata;
+ }
+
private int findStripe(long rowNumber) {
for (int i = 0; i < stripes.size(); i++) {
StripeInformation stripe = stripes.get(i);
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
index 6094175..e0c52e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
@@ -55,6 +55,11 @@ class StringRedBlackTree extends RedBlackTree {
return addNewKey();
}
+ public int add(byte[] bytes, int offset, int length) {
+ newKey.set(bytes, offset, length);
+ return addNewKey();
+ }
+
@Override
protected int compareValue(int position) {
int start = keyOffsets.get(position);
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
index 3481bb3..b365408 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
@@ -18,6 +18,17 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -264,6 +275,69 @@ public class TypeDescription {
return maxId;
}
+ private ColumnVector createColumn() {
+ switch (category) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case TIMESTAMP:
+ case DATE:
+ return new LongColumnVector();
+ case FLOAT:
+ case DOUBLE:
+ return new DoubleColumnVector();
+ case DECIMAL:
+ return new DecimalColumnVector(precision, scale);
+ case STRING:
+ case BINARY:
+ case CHAR:
+ case VARCHAR:
+ return new BytesColumnVector();
+ case STRUCT: {
+ ColumnVector[] fieldVector = new ColumnVector[children.size()];
+ for(int i=0; i < fieldVector.length; ++i) {
+ fieldVector[i] = children.get(i).createColumn();
+ }
+ return new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+ fieldVector);
+ }
+ case UNION: {
+ ColumnVector[] fieldVector = new ColumnVector[children.size()];
+ for(int i=0; i < fieldVector.length; ++i) {
+ fieldVector[i] = children.get(i).createColumn();
+ }
+ return new UnionColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+ fieldVector);
+ }
+ case LIST:
+ return new ListColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+ children.get(0).createColumn());
+ case MAP:
+ return new MapColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+ children.get(0).createColumn(), children.get(1).createColumn());
+ default:
+ throw new IllegalArgumentException("Unknown type " + category);
+ }
+ }
+
+ public VectorizedRowBatch createRowBatch() {
+ VectorizedRowBatch result;
+ if (category == Category.STRUCT) {
+ result = new VectorizedRowBatch(children.size(),
+ VectorizedRowBatch.DEFAULT_SIZE);
+ for(int i=0; i < result.cols.length; ++i) {
+ result.cols[i] = children.get(i).createColumn();
+ }
+ } else {
+ result = new VectorizedRowBatch(1, VectorizedRowBatch.DEFAULT_SIZE);
+ result.cols[0] = createColumn();
+ }
+ result.reset();
+ return result;
+ }
+
/**
* Get the kind of this type.
* @return get the category for this type.
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
index 8991f2d..1873ed1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.io.IOException;
@@ -52,6 +53,12 @@ public interface Writer {
void addRow(Object row) throws IOException;
/**
+ * Add a row batch to the ORC file.
+ * @param batch the rows to add
+ */
+ void addRowBatch(VectorizedRowBatch batch) throws IOException;
+
+ /**
* Flush all of the buffers and close the file. No methods on this writer
* should be called afterwards.
* @throws IOException
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
index 5a82d20..c3916d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
@@ -26,6 +26,7 @@ import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -40,6 +41,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy;
@@ -582,7 +593,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
private abstract static class TreeWriter {
protected final int id;
protected final ObjectInspector inspector;
- private final BitFieldWriter isPresent;
+ protected final BitFieldWriter isPresent;
private final boolean isCompressed;
protected final ColumnStatisticsImpl indexStatistics;
protected final ColumnStatisticsImpl stripeColStatistics;
@@ -708,6 +719,73 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
+ /**
+ * Handle the top level object write.
+ *
+ * This default method is used for all types except structs, which are the
+ * typical case. VectorizedRowBatch assumes the top level object is a
+ * struct, so we use the first column for all other types.
+ * @param batch the batch to write from
+ * @param offset the row to start on
+ * @param length the number of rows to write
+ * @throws IOException
+ */
+ void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException {
+ writeBatch(batch.cols[0], offset, length);
+ }
+
+ /**
+ * Write the values from the given vector from offset for length elements.
+ * @param vector the vector to write from
+ * @param offset the first value from the vector to write
+ * @param length the number of values from the vector to write
+ * @throws IOException
+ */
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ if (vector.noNulls) {
+ indexStatistics.increment(length);
+ if (isPresent != null) {
+ for (int i = 0; i < length; ++i) {
+ isPresent.write(1);
+ }
+ }
+ } else {
+ if (vector.isRepeating) {
+ boolean isNull = vector.isNull[0];
+ if (isPresent != null) {
+ for (int i = 0; i < length; ++i) {
+ isPresent.write(isNull ? 0 : 1);
+ }
+ }
+ if (isNull) {
+ foundNulls = true;
+ indexStatistics.setNull();
+ } else {
+ indexStatistics.increment(length);
+ }
+ } else {
+ // count the number of non-null values
+ int nonNullCount = 0;
+ for(int i = 0; i < length; ++i) {
+ boolean isNull = vector.isNull[i + offset];
+ if (!isNull) {
+ nonNullCount += 1;
+ }
+ if (isPresent != null) {
+ isPresent.write(isNull ? 0 : 1);
+ }
+ }
+ indexStatistics.increment(nonNullCount);
+ if (nonNullCount != length) {
+ foundNulls = true;
+ indexStatistics.setNull();
+ }
+ }
+ }
+ }
+
private void removeIsPresentPositions() {
for(int i=0; i < rowIndex.getEntryCount(); ++i) {
RowIndexEntry.Builder entry = rowIndex.getEntryBuilder(i);
@@ -876,12 +954,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.write(obj);
if (obj != null) {
boolean val = ((BooleanObjectInspector) inspector).get(obj);
- indexStatistics.updateBoolean(val);
+ indexStatistics.updateBoolean(val, 1);
writer.write(val ? 1 : 0);
}
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int value = vec.vector[0] == 0 ? 0 : 1;
+ indexStatistics.updateBoolean(value != 0, length);
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int value = vec.vector[i + offset] == 0 ? 0 : 1;
+ writer.write(value);
+ indexStatistics.updateBoolean(value != 0, 1);
+ }
+ }
+ }
+ }
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -915,7 +1017,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.write(obj);
if (obj != null) {
byte val = ((ByteObjectInspector) inspector).get(obj);
- indexStatistics.updateInteger(val);
+ indexStatistics.updateInteger(val, 1);
if (createBloomFilter) {
bloomFilter.addLong(val);
}
@@ -924,6 +1026,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte value = (byte) vec.vector[0];
+ indexStatistics.updateInteger(value, length);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ byte value = (byte) vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateInteger(value, 1);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -994,7 +1126,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
} else {
val = shortInspector.get(obj);
}
- indexStatistics.updateInteger(val);
+ indexStatistics.updateInteger(val, 1);
if (createBloomFilter) {
// integers are converted to longs in column statistics and during SARG evaluation
bloomFilter.addLong(val);
@@ -1004,6 +1136,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ long value = vec.vector[0];
+ indexStatistics.updateInteger(value, length);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ long value = vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateInteger(value, 1);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -1049,6 +1211,37 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DoubleColumnVector vec = (DoubleColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ float value = (float) vec.vector[0];
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ for(int i=0; i < length; ++i) {
+ utils.writeFloat(stream, value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ float value = (float) vec.vector[i + offset];
+ utils.writeFloat(stream, value);
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ }
+ }
+ }
+ }
+
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -1093,6 +1286,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DoubleColumnVector vec = (DoubleColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ double value = vec.vector[0];
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ for(int i=0; i < length; ++i) {
+ utils.writeDouble(stream, value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ double value = vec.vector[i + offset];
+ utils.writeDouble(stream, value);
+ indexStatistics.updateDouble(value);
+ if (createBloomFilter) {
+ bloomFilter.addDouble(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -1107,16 +1330,16 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
- private static class StringTreeWriter extends TreeWriter {
+ private static abstract class StringBaseTreeWriter extends TreeWriter {
private static final int INITIAL_DICTIONARY_SIZE = 4096;
private final OutStream stringOutput;
private final IntegerWriter lengthOutput;
private final IntegerWriter rowOutput;
- private final StringRedBlackTree dictionary =
+ protected final StringRedBlackTree dictionary =
new StringRedBlackTree(INITIAL_DICTIONARY_SIZE);
- private final DynamicIntArray rows = new DynamicIntArray();
- private final PositionedOutputStream directStreamOutput;
- private final IntegerWriter directLengthOutput;
+ protected final DynamicIntArray rows = new DynamicIntArray();
+ protected final PositionedOutputStream directStreamOutput;
+ protected final IntegerWriter directLengthOutput;
private final List<OrcProto.RowIndexEntry> savedRowIndex =
new ArrayList<OrcProto.RowIndexEntry>();
private final boolean buildIndex;
@@ -1124,12 +1347,12 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
// If the number of keys in a dictionary is greater than this fraction of
//the total number of non-null rows, turn off dictionary encoding
private final double dictionaryKeySizeThreshold;
- private boolean useDictionaryEncoding = true;
+ protected boolean useDictionaryEncoding = true;
private boolean isDirectV2 = true;
private boolean doneDictionaryCheck;
private final boolean strideDictionaryCheck;
- StringTreeWriter(int columnId,
+ StringBaseTreeWriter(int columnId,
ObjectInspector inspector,
TypeDescription schema,
StreamFactory writer,
@@ -1171,7 +1394,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
super.write(obj);
if (obj != null) {
Text val = getTextValue(obj);
- if (useDictionaryEncoding || !strideDictionaryCheck) {
+ if (useDictionaryEncoding) {
rows.add(dictionary.add(val));
} else {
// write data and length
@@ -1180,7 +1403,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
indexStatistics.updateString(val);
if (createBloomFilter) {
- bloomFilter.addBytes(val.getBytes(), val.getLength());
+ bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
}
}
}
@@ -1364,10 +1587,69 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
}
+ private static class StringTreeWriter extends StringBaseTreeWriter {
+ StringTreeWriter(int columnId,
+ ObjectInspector inspector,
+ TypeDescription schema,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, schema, writer, nullable);
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(vec.vector[0], vec.start[0], vec.length[0]);
+ for(int i=0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ directStreamOutput.write(vec.vector[0], vec.start[0],
+ vec.length[0]);
+ directLengthOutput.write(vec.length[0]);
+ }
+ }
+ indexStatistics.updateString(vec.vector[0], vec.start[0],
+ vec.length[0], length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]));
+ } else {
+ directStreamOutput.write(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ directLengthOutput.write(vec.length[offset + i]);
+ }
+ indexStatistics.updateString(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i], 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ }
+ }
+ }
+ }
+ }
+ }
+
/**
* Under the covers, char is written to ORC the same way as string.
*/
- private static class CharTreeWriter extends StringTreeWriter {
+ private static class CharTreeWriter extends StringBaseTreeWriter {
+ private final int itemLength;
+ private final byte[] padding;
CharTreeWriter(int columnId,
ObjectInspector inspector,
@@ -1375,6 +1657,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, schema, writer, nullable);
+ itemLength = schema.getMaxLength();
+ padding = new byte[itemLength];
}
/**
@@ -1385,12 +1669,79 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
return (((HiveCharObjectInspector) inspector)
.getPrimitiveWritableObject(obj)).getTextValue();
}
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte[] ptr;
+ int ptrOffset;
+ if (vec.length[0] >= itemLength) {
+ ptr = vec.vector[0];
+ ptrOffset = vec.start[0];
+ } else {
+ ptr = padding;
+ ptrOffset = 0;
+ System.arraycopy(vec.vector[0], vec.start[0], ptr, 0,
+ vec.length[0]);
+ Arrays.fill(ptr, vec.length[0], itemLength, (byte) ' ');
+ }
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(ptr, ptrOffset, itemLength);
+ for(int i=0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ directStreamOutput.write(ptr, ptrOffset, itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ }
+ indexStatistics.updateString(ptr, ptrOffset, itemLength, length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ byte[] ptr;
+ int ptrOffset;
+ if (vec.length[offset + i] >= itemLength) {
+ ptr = vec.vector[offset + i];
+ ptrOffset = vec.start[offset + i];
+ } else {
+ // it is the wrong length, so copy it
+ ptr = padding;
+ ptrOffset = 0;
+ System.arraycopy(vec.vector[offset + i], vec.start[offset + i],
+ ptr, 0, vec.length[offset + i]);
+ Arrays.fill(ptr, vec.length[offset + i], itemLength, (byte) ' ');
+ }
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(ptr, ptrOffset, itemLength));
+ } else {
+ directStreamOutput.write(ptr, ptrOffset, itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ indexStatistics.updateString(ptr, ptrOffset, itemLength, 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(ptr, ptrOffset, itemLength);
+ }
+ }
+ }
+ }
+ }
}
/**
* Under the covers, varchar is written to ORC the same way as string.
*/
- private static class VarcharTreeWriter extends StringTreeWriter {
+ private static class VarcharTreeWriter extends StringBaseTreeWriter {
+ private final int maxLength;
VarcharTreeWriter(int columnId,
ObjectInspector inspector,
@@ -1398,6 +1749,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, schema, writer, nullable);
+ maxLength = schema.getMaxLength();
}
/**
@@ -1408,6 +1760,55 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
return (((HiveVarcharObjectInspector) inspector)
.getPrimitiveWritableObject(obj)).getTextValue();
}
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int itemLength = Math.min(vec.length[0], maxLength);
+ if (useDictionaryEncoding) {
+ int id = dictionary.add(vec.vector[0], vec.start[0], itemLength);
+ for(int i=0; i < length; ++i) {
+ rows.add(id);
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ directStreamOutput.write(vec.vector[0], vec.start[0],
+ itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ }
+ indexStatistics.updateString(vec.vector[0], vec.start[0],
+ itemLength, length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[0], vec.start[0], itemLength);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int itemLength = Math.min(vec.length[offset + i], maxLength);
+ if (useDictionaryEncoding) {
+ rows.add(dictionary.add(vec.vector[offset + i],
+ vec.start[offset + i], itemLength));
+ } else {
+ directStreamOutput.write(vec.vector[offset + i],
+ vec.start[offset + i], itemLength);
+ directLengthOutput.write(itemLength);
+ }
+ indexStatistics.updateString(vec.vector[offset + i],
+ vec.start[offset + i], itemLength, 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], itemLength);
+ }
+ }
+ }
+ }
+ }
}
private static class BinaryTreeWriter extends TreeWriter {
@@ -1449,12 +1850,48 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
length.write(val.getLength());
indexStatistics.updateBinary(val);
if (createBloomFilter) {
- bloomFilter.addBytes(val.getBytes(), val.getLength());
+ bloomFilter.addBytes(val.getBytes(), 0, val.getLength());
}
}
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ BytesColumnVector vec = (BytesColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ for(int i=0; i < length; ++i) {
+ stream.write(vec.vector[0], vec.start[0],
+ vec.length[0]);
+ this.length.write(vec.length[0]);
+ }
+ indexStatistics.updateBinary(vec.vector[0], vec.start[0],
+ vec.length[0], length);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[0], vec.start[0], vec.length[0]);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ stream.write(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ this.length.write(vec.length[offset + i]);
+ indexStatistics.updateBinary(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i], 1);
+ if (createBloomFilter) {
+ bloomFilter.addBytes(vec.vector[offset + i],
+ vec.start[offset + i], vec.length[offset + i]);
+ }
+ }
+ }
+ }
+ }
+
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -1472,6 +1909,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
static final int MILLIS_PER_SECOND = 1000;
+ static final int NANOS_PER_SECOND = 1000000000;
+ static final int MILLIS_PER_NANO = 1000000;
static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00";
private static class TimestampTreeWriter extends TreeWriter {
@@ -1524,6 +1963,47 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ long value = vec.vector[0];
+ long valueMillis = value / MILLIS_PER_NANO;
+ indexStatistics.updateTimestamp(valueMillis);
+ if (createBloomFilter) {
+ bloomFilter.addLong(valueMillis);
+ }
+ final long secs = value / NANOS_PER_SECOND - base_timestamp;
+ final long nano = formatNanos((int) (value % NANOS_PER_SECOND));
+ for(int i=0; i < length; ++i) {
+ seconds.write(secs);
+ nanos.write(nano);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ long value = vec.vector[i + offset];
+ long valueMillis = value / MILLIS_PER_NANO;
+ long valueSecs = value /NANOS_PER_SECOND - base_timestamp;
+ int valueNanos = (int) (value % NANOS_PER_SECOND);
+ if (valueNanos < 0) {
+ valueNanos += NANOS_PER_SECOND;
+ }
+ seconds.write(valueSecs);
+ nanos.write(formatNanos(valueNanos));
+ indexStatistics.updateTimestamp(valueMillis);
+ if (createBloomFilter) {
+ bloomFilter.addLong(valueMillis);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -1588,6 +2068,36 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ LongColumnVector vec = (LongColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int value = (int) vec.vector[0];
+ indexStatistics.updateDate(value);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ for(int i=0; i < length; ++i) {
+ writer.write(value);
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ int value = (int) vec.vector[i + offset];
+ writer.write(value);
+ indexStatistics.updateDate(value);
+ if (createBloomFilter) {
+ bloomFilter.addLong(value);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -1660,6 +2170,40 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ DecimalColumnVector vec = (DecimalColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ HiveDecimal value = vec.vector[0].getHiveDecimal();
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ bloomFilter.addString(value.toString());
+ }
+ for(int i=0; i < length; ++i) {
+ SerializationUtils.writeBigInteger(valueStream,
+ value.unscaledValue());
+ scaleStream.write(value.scale());
+ }
+ }
+ } else {
+ for(int i=0; i < length; ++i) {
+ if (vec.noNulls || !vec.isNull[i + offset]) {
+ HiveDecimal value = vec.vector[i + offset].getHiveDecimal();
+ SerializationUtils.writeBigInteger(valueStream,
+ value.unscaledValue());
+ scaleStream.write(value.scale());
+ indexStatistics.updateDecimal(value);
+ if (createBloomFilter) {
+ bloomFilter.addString(value.toString());
+ }
+ }
+ }
+ }
+ }
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -1685,13 +2229,21 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
boolean nullable) throws IOException {
super(columnId, inspector, schema, writer, nullable);
List<TypeDescription> children = schema.getChildren();
- StructObjectInspector structObjectInspector =
- (StructObjectInspector) inspector;
- fields = structObjectInspector.getAllStructFieldRefs();
+ if (inspector != null) {
+ StructObjectInspector structObjectInspector =
+ (StructObjectInspector) inspector;
+ fields = structObjectInspector.getAllStructFieldRefs();
+ } else {
+ fields = null;
+ }
childrenWriters = new TreeWriter[children.size()];
for(int i=0; i < childrenWriters.length; ++i) {
- ObjectInspector childOI = i < fields.size() ?
- fields.get(i).getFieldObjectInspector() : null;
+ ObjectInspector childOI;
+ if (fields != null && i < fields.size()) {
+ childOI = fields.get(i).getFieldObjectInspector();
+ } else {
+ childOI = null;
+ }
childrenWriters[i] = createTreeWriter(
childOI, children.get(i), writer,
true);
@@ -1713,6 +2265,60 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ void writeRootBatch(VectorizedRowBatch batch, int offset,
+ int length) throws IOException {
+ // update the statistics for the root column
+ indexStatistics.increment(length);
+ // I'm assuming that the root column isn't nullable so that I don't need
+ // to update isPresent.
+ for(int i=0; i < childrenWriters.length; ++i) {
+ childrenWriters[i].writeBatch(batch.cols[i], offset, length);
+ }
+ }
+
+ private static void writeFields(StructColumnVector vector,
+ TreeWriter[] childrenWriters,
+ int offset, int length) throws IOException {
+ for(int field=0; field < childrenWriters.length; ++field) {
+ childrenWriters[field].writeBatch(vector.fields[field], offset, length);
+ }
+ }
+
+ @Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ StructColumnVector vec = (StructColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ writeFields(vec, childrenWriters, offset, length);
+ }
+ } else if (vector.noNulls) {
+ writeFields(vec, childrenWriters, offset, length);
+ } else {
+ // write the records in runs
+ int currentRun = 0;
+ boolean started = false;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ if (!started) {
+ started = true;
+ currentRun = i;
+ }
+ } else if (started) {
+ started = false;
+ writeFields(vec, childrenWriters, offset + currentRun,
+ i - currentRun);
+ }
+ }
+ if (started) {
+ writeFields(vec, childrenWriters, offset + currentRun,
+ length - currentRun);
+ }
+ }
+ }
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -1734,8 +2340,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
boolean nullable) throws IOException {
super(columnId, inspector, schema, writer, nullable);
this.isDirectV2 = isNewWriteFormat(writer);
- ObjectInspector childOI =
- ((ListObjectInspector) inspector).getListElementObjectInspector();
+ ObjectInspector childOI = null;
+ if (inspector != null) {
+ childOI =
+ ((ListObjectInspector) inspector).getListElementObjectInspector();
+ }
childrenWriters = new TreeWriter[1];
childrenWriters[0] =
createTreeWriter(childOI, schema.getChildren().get(0), writer, true);
@@ -1771,6 +2380,52 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ ListColumnVector vec = (ListColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int childOffset = (int) vec.offsets[0];
+ int childLength = (int) vec.lengths[0];
+ for(int i=0; i < length; ++i) {
+ lengths.write(childLength);
+ childrenWriters[0].writeBatch(vec.child, childOffset, childLength);
+ }
+ if (createBloomFilter) {
+ bloomFilter.addLong(childLength);
+ }
+ }
+ } else {
+ // write the elements in runs
+ int currentOffset = 0;
+ int currentLength = 0;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ int nextLength = (int) vec.lengths[offset + i];
+ int nextOffset = (int) vec.offsets[offset + i];
+ lengths.write(nextLength);
+ if (currentLength == 0) {
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else if (currentOffset + currentLength != nextOffset) {
+ childrenWriters[0].writeBatch(vec.child, currentOffset,
+ currentLength);
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else {
+ currentLength += nextLength;
+ }
+ }
+ }
+ if (currentLength != 0) {
+ childrenWriters[0].writeBatch(vec.child, currentOffset,
+ currentLength);
+ }
+ }
+ }
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -1799,15 +2454,19 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
boolean nullable) throws IOException {
super(columnId, inspector, schema, writer, nullable);
this.isDirectV2 = isNewWriteFormat(writer);
- MapObjectInspector insp = (MapObjectInspector) inspector;
childrenWriters = new TreeWriter[2];
List<TypeDescription> children = schema.getChildren();
+ ObjectInspector keyInsp = null;
+ ObjectInspector valueInsp = null;
+ if (inspector != null) {
+ MapObjectInspector insp = (MapObjectInspector) inspector;
+ keyInsp = insp.getMapKeyObjectInspector();
+ valueInsp = insp.getMapValueObjectInspector();
+ }
childrenWriters[0] =
- createTreeWriter(insp.getMapKeyObjectInspector(), children.get(0),
- writer, true);
+ createTreeWriter(keyInsp, children.get(0), writer, true);
childrenWriters[1] =
- createTreeWriter(insp.getMapValueObjectInspector(), children.get(1),
- writer, true);
+ createTreeWriter(valueInsp, children.get(1), writer, true);
lengths = createIntegerWriter(writer.createStream(columnId,
OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer);
recordPosition(rowIndexPosition);
@@ -1843,6 +2502,57 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ MapColumnVector vec = (MapColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ int childOffset = (int) vec.offsets[0];
+ int childLength = (int) vec.lengths[0];
+ for(int i=0; i < length; ++i) {
+ lengths.write(childLength);
+ childrenWriters[0].writeBatch(vec.keys, childOffset, childLength);
+ childrenWriters[1].writeBatch(vec.values, childOffset, childLength);
+ }
+ if (createBloomFilter) {
+ bloomFilter.addLong(childLength);
+ }
+ }
+ } else {
+ // write the elements in runs
+ int currentOffset = 0;
+ int currentLength = 0;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ int nextLength = (int) vec.lengths[offset + i];
+ int nextOffset = (int) vec.offsets[offset + i];
+ lengths.write(nextLength);
+ if (currentLength == 0) {
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else if (currentOffset + currentLength != nextOffset) {
+ childrenWriters[0].writeBatch(vec.keys, currentOffset,
+ currentLength);
+ childrenWriters[1].writeBatch(vec.values, currentOffset,
+ currentLength);
+ currentOffset = nextOffset;
+ currentLength = nextLength;
+ } else {
+ currentLength += nextLength;
+ }
+ }
+ }
+ if (currentLength != 0) {
+ childrenWriters[0].writeBatch(vec.keys, currentOffset,
+ currentLength);
+ childrenWriters[1].writeBatch(vec.values, currentOffset,
+ currentLength);
+ }
+ }
+ }
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -1869,13 +2579,17 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
StreamFactory writer,
boolean nullable) throws IOException {
super(columnId, inspector, schema, writer, nullable);
- UnionObjectInspector insp = (UnionObjectInspector) inspector;
- List<ObjectInspector> choices = insp.getObjectInspectors();
+ List<ObjectInspector> choices = null;
+ if (inspector != null) {
+ UnionObjectInspector insp = (UnionObjectInspector) inspector;
+ choices = insp.getObjectInspectors();
+ }
List<TypeDescription> children = schema.getChildren();
childrenWriters = new TreeWriter[children.size()];
for(int i=0; i < childrenWriters.length; ++i) {
- childrenWriters[i] = createTreeWriter(choices.get(i),
- children.get(i), writer, true);
+ childrenWriters[i] =
+ createTreeWriter(choices != null ? choices.get(i) : null,
+ children.get(i), writer, true);
}
tags =
new RunLengthByteWriter(writer.createStream(columnId,
@@ -1898,6 +2612,54 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
}
@Override
+ void writeBatch(ColumnVector vector, int offset,
+ int length) throws IOException {
+ super.writeBatch(vector, offset, length);
+ UnionColumnVector vec = (UnionColumnVector) vector;
+ if (vector.isRepeating) {
+ if (vector.noNulls || !vector.isNull[0]) {
+ byte tag = (byte) vec.tags[0];
+ for(int i=0; i < length; ++i) {
+ tags.write(tag);
+ }
+ if (createBloomFilter) {
+ bloomFilter.addLong(tag);
+ }
+ childrenWriters[tag].writeBatch(vec.fields[tag], offset, length);
+ }
+ } else {
+ // write the records in runs of the same tag
+ byte prevTag = 0;
+ int currentRun = 0;
+ boolean started = false;
+ for(int i=0; i < length; ++i) {
+ if (!vec.isNull[i + offset]) {
+ byte tag = (byte) vec.tags[offset + i];
+ tags.write(tag);
+ if (!started) {
+ started = true;
+ currentRun = i;
+ prevTag = tag;
+ } else if (tag != prevTag) {
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, i - currentRun);
+ currentRun = i;
+ prevTag = tag;
+ }
+ } else if (started) {
+ started = false;
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, i - currentRun);
+ }
+ }
+ if (started) {
+ childrenWriters[prevTag].writeBatch(vec.fields[prevTag],
+ offset + currentRun, length - currentRun);
+ }
+ }
+ }
+
+ @Override
void writeStripe(OrcProto.StripeFooter.Builder builder,
int requiredIndexEntries) throws IOException {
super.writeStripe(builder, requiredIndexEntries);
@@ -2365,7 +3127,31 @@ public class WriterImpl implements Writer, MemoryManager.Callback {
createRowIndexEntry();
}
}
- memoryManager.addedRow();
+ memoryManager.addedRow(1);
+ }
+
+ @Override
+ public void addRowBatch(VectorizedRowBatch batch) throws IOException {
+ if (buildIndex) {
+ // Batch the writes up to the rowIndexStride so that we can get the
+ // right size indexes.
+ int posn = 0;
+ while (posn < batch.size) {
+ int chunkSize = Math.min(batch.size - posn,
+ rowIndexStride - rowsInIndex);
+ treeWriter.writeRootBatch(batch, posn, chunkSize);
+ posn += chunkSize;
+ rowsInIndex += chunkSize;
+ rowsInStripe += chunkSize;
+ if (rowsInIndex >= rowIndexStride) {
+ createRowIndexEntry();
+ }
+ }
+ } else {
+ rowsInStripe += batch.size;
+ treeWriter.writeRootBatch(batch, 0, batch.size);
+ }
+ memoryManager.addedRow(batch.size);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
index f6111e8..a51177e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestColumnStatistics.java
@@ -52,17 +52,16 @@ public class TestColumnStatistics {
ColumnStatisticsImpl stats1 = ColumnStatisticsImpl.create(schema);
ColumnStatisticsImpl stats2 = ColumnStatisticsImpl.create(schema);
- stats1.updateInteger(10);
- stats1.updateInteger(10);
- stats2.updateInteger(1);
- stats2.updateInteger(1000);
+ stats1.updateInteger(10, 2);
+ stats2.updateInteger(1, 1);
+ stats2.updateInteger(1000, 1);
stats1.merge(stats2);
IntegerColumnStatistics typed = (IntegerColumnStatistics) stats1;
assertEquals(1, typed.getMinimum());
assertEquals(1000, typed.getMaximum());
stats1.reset();
- stats1.updateInteger(-10);
- stats1.updateInteger(10000);
+ stats1.updateInteger(-10, 1);
+ stats1.updateInteger(10000, 1);
stats1.merge(stats2);
assertEquals(-10, typed.getMinimum());
assertEquals(10000, typed.getMaximum());
@@ -101,11 +100,14 @@ public class TestColumnStatistics {
stats1.updateString(new Text("david"));
stats1.updateString(new Text("charles"));
stats2.updateString(new Text("anne"));
- stats2.updateString(new Text("erin"));
+ byte[] erin = new byte[]{0, 1, 2, 3, 4, 5, 101, 114, 105, 110};
+ stats2.updateString(erin, 6, 4, 5);
+ assertEquals(24, ((StringColumnStatistics)stats2).getSum());
stats1.merge(stats2);
StringColumnStatistics typed = (StringColumnStatistics) stats1;
assertEquals("anne", typed.getMinimum());
assertEquals("erin", typed.getMaximum());
+ assertEquals(39, typed.getSum());
stats1.reset();
stats1.updateString(new Text("aaa"));
stats1.updateString(new Text("zzz"));
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
index fb6be16..19aaff3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestMemoryManager.java
@@ -122,7 +122,7 @@ public class TestMemoryManager {
}
// add enough rows to get the memory manager to check the limits
for(int i=0; i < 10000; ++i) {
- mgr.addedRow();
+ mgr.addedRow(1);
}
for(int call=0; call < calls.length; ++call) {
verify(calls[call], times(2))
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
index e78f7aa..146f5b1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
@@ -1818,8 +1818,9 @@ public class TestOrcFile {
}
@Override
- void addedRow() throws IOException {
- if (++rows % 100 == 0) {
+ void addedRow(int count) throws IOException {
+ rows += count;
+ if (rows % 100 == 0) {
callback.checkMemory(rate);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f5f9f30d/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 797bbfb..15ee24c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -815,8 +815,10 @@ public class TestOrcRawRecordMerger {
MemoryManager mgr = new MemoryManager(conf){
int rowsAddedSinceCheck = 0;
- synchronized void addedRow() throws IOException {
- if (++rowsAddedSinceCheck >= 2) {
+ @Override
+ synchronized void addedRow(int rows) throws IOException {
+ rowsAddedSinceCheck += rows;
+ if (rowsAddedSinceCheck >= 2) {
notifyWriters();
rowsAddedSinceCheck = 0;
}
@@ -912,8 +914,10 @@ public class TestOrcRawRecordMerger {
MemoryManager mgr = new MemoryManager(conf){
int rowsAddedSinceCheck = 0;
- synchronized void addedRow() throws IOException {
- if (++rowsAddedSinceCheck >= 2) {
+ @Override
+ synchronized void addedRow(int rows) throws IOException {
+ rowsAddedSinceCheck += rows;
+ if (rowsAddedSinceCheck >= 2) {
notifyWriters();
rowsAddedSinceCheck = 0;
}