You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2017/05/18 22:44:02 UTC
[3/8] hive git commit: HIVE-16207: Add support for Complex Types in
Fast SerDe (Teddy Choi, reviewed by Matt McCline)
http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java
index 1401ac3..ef77daf 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleSerializeWrite.java
@@ -22,6 +22,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Timestamp;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.codec.binary.Base64;
import org.slf4j.Logger;
@@ -48,7 +52,6 @@ import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp;
import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
import org.apache.hadoop.hive.serde2.fast.SerializeWrite;
import org.apache.hadoop.io.Text;
-import org.apache.hive.common.util.DateUtils;
/*
* Directly serialize, field-by-field, the LazyBinary format.
@@ -60,7 +63,7 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
private LazySerDeParameters lazyParams;
- private byte separator;
+ private byte[] separators;
private boolean[] needsEscape;
private boolean isEscaped;
private byte escapeChar;
@@ -70,6 +73,8 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
private int fieldCount;
private int index;
+ private int currentLevel;
+ private Deque<Integer> indexStack = new ArrayDeque<Integer>();
// For thread safety, we allocate private writable objects for our use only.
private DateWritable dateWritable;
@@ -80,14 +85,14 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
private byte[] decimalScratchBuffer;
public LazySimpleSerializeWrite(int fieldCount,
- byte separator, LazySerDeParameters lazyParams) {
+ LazySerDeParameters lazyParams) {
this();
this.fieldCount = fieldCount;
-
- this.separator = separator;
+
this.lazyParams = lazyParams;
+ separators = lazyParams.getSeparators();
isEscaped = lazyParams.isEscaped();
escapeChar = lazyParams.getEscapeChar();
needsEscape = lazyParams.getNeedsEscape();
@@ -106,6 +111,7 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
this.output = output;
output.reset();
index = 0;
+ currentLevel = 0;
}
/*
@@ -115,6 +121,7 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
public void setAppend(Output output) {
this.output = output;
index = 0;
+ currentLevel = 0;
}
/*
@@ -124,35 +131,19 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
public void reset() {
output.reset();
index = 0;
+ currentLevel = 0;
}
/*
- * General Pattern:
- *
- * if (index > 0) {
- * output.write(separator);
- * }
- *
- * WHEN NOT NULL: Write value.
- * OTHERWISE NULL: Write nullSequenceBytes.
- *
- * Increment index
- *
- */
-
- /*
* Write a NULL field.
*/
@Override
public void writeNull() throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
+ beginPrimitive();
output.write(nullSequenceBytes);
- index++;
+ finishPrimitive();
}
/*
@@ -160,18 +151,13 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeBoolean(boolean v) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
if (v) {
output.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length);
} else {
output.write(LazyUtils.falseBytes, 0, LazyUtils.falseBytes.length);
}
-
- index++;
+ finishPrimitive();
}
/*
@@ -179,14 +165,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeByte(byte v) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
LazyInteger.writeUTF8(output, v);
-
- index++;
+ finishPrimitive();
}
/*
@@ -194,14 +175,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeShort(short v) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
LazyInteger.writeUTF8(output, v);
-
- index++;
+ finishPrimitive();
}
/*
@@ -209,14 +185,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeInt(int v) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
LazyInteger.writeUTF8(output, v);
-
- index++;
+ finishPrimitive();
}
/*
@@ -224,14 +195,9 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeLong(long v) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
LazyLong.writeUTF8(output, v);
-
- index++;
+ finishPrimitive();
}
/*
@@ -239,15 +205,10 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeFloat(float vf) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
ByteBuffer b = Text.encode(String.valueOf(vf));
output.write(b.array(), 0, b.limit());
-
- index++;
+ finishPrimitive();
}
/*
@@ -255,15 +216,10 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeDouble(double v) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
ByteBuffer b = Text.encode(String.valueOf(v));
output.write(b.array(), 0, b.limit());
-
- index++;
+ finishPrimitive();
}
/*
@@ -274,28 +230,20 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeString(byte[] v) throws IOException {
-
- if (index > 0) {
- output.write(separator);
+ beginPrimitive();
+ if (v.equals(nullSequenceBytes)) {
}
-
LazyUtils.writeEscaped(output, v, 0, v.length, isEscaped, escapeChar,
needsEscape);
-
- index++;
+ finishPrimitive();
}
@Override
public void writeString(byte[] v, int start, int length) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
LazyUtils.writeEscaped(output, v, start, length, isEscaped, escapeChar,
needsEscape);
-
- index++;
+ finishPrimitive();
}
/*
@@ -303,16 +251,11 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeHiveChar(HiveChar hiveChar) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
ByteBuffer b = Text.encode(hiveChar.getPaddedValue());
LazyUtils.writeEscaped(output, b.array(), 0, b.limit(), isEscaped, escapeChar,
needsEscape);
-
- index++;
+ finishPrimitive();
}
/*
@@ -320,16 +263,11 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeHiveVarchar(HiveVarchar hiveVarchar) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
ByteBuffer b = Text.encode(hiveVarchar.getValue());
LazyUtils.writeEscaped(output, b.array(), 0, b.limit(), isEscaped, escapeChar,
needsEscape);
-
- index++;
+ finishPrimitive();
}
/*
@@ -337,32 +275,22 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeBinary(byte[] v) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
byte[] toEncode = new byte[v.length];
System.arraycopy(v, 0, toEncode, 0, v.length);
byte[] toWrite = Base64.encodeBase64(toEncode);
output.write(toWrite, 0, toWrite.length);
-
- index++;
+ finishPrimitive();
}
@Override
public void writeBinary(byte[] v, int start, int length) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
byte[] toEncode = new byte[length];
System.arraycopy(v, start, toEncode, 0, length);
byte[] toWrite = Base64.encodeBase64(toEncode);
output.write(toWrite, 0, toWrite.length);
-
- index++;
+ finishPrimitive();
}
/*
@@ -370,35 +298,25 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeDate(Date date) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
if (dateWritable == null) {
dateWritable = new DateWritable();
}
dateWritable.set(date);
LazyDate.writeUTF8(output, dateWritable);
-
- index++;
+ finishPrimitive();
}
// We provide a faster way to write a date without a Date object.
@Override
public void writeDate(int dateAsDays) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
if (dateWritable == null) {
dateWritable = new DateWritable();
}
dateWritable.set(dateAsDays);
LazyDate.writeUTF8(output, dateWritable);
-
- index++;
+ finishPrimitive();
}
/*
@@ -406,18 +324,13 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeTimestamp(Timestamp v) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
if (timestampWritable == null) {
timestampWritable = new TimestampWritable();
}
timestampWritable.set(v);
LazyTimestamp.writeUTF8(output, timestampWritable);
-
- index++;
+ finishPrimitive();
}
/*
@@ -425,35 +338,25 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeHiveIntervalYearMonth(HiveIntervalYearMonth viyt) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
if (hiveIntervalYearMonthWritable == null) {
hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable();
}
hiveIntervalYearMonthWritable.set(viyt);
LazyHiveIntervalYearMonth.writeUTF8(output, hiveIntervalYearMonthWritable);
-
- index++;
+ finishPrimitive();
}
@Override
public void writeHiveIntervalYearMonth(int totalMonths) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
if (hiveIntervalYearMonthWritable == null) {
hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable();
}
hiveIntervalYearMonthWritable.set(totalMonths);
LazyHiveIntervalYearMonth.writeUTF8(output, hiveIntervalYearMonthWritable);
-
- index++;
+ finishPrimitive();
}
/*
@@ -461,18 +364,13 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeHiveIntervalDayTime(HiveIntervalDayTime vidt) throws IOException {
-
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
if (hiveIntervalDayTimeWritable == null) {
hiveIntervalDayTimeWritable = new HiveIntervalDayTimeWritable();
}
hiveIntervalDayTimeWritable.set(vidt);
LazyHiveIntervalDayTime.writeUTF8(output, hiveIntervalDayTimeWritable);
-
- index++;
+ finishPrimitive();
}
/*
@@ -483,29 +381,119 @@ public final class LazySimpleSerializeWrite implements SerializeWrite {
*/
@Override
public void writeHiveDecimal(HiveDecimal dec, int scale) throws IOException {
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
if (decimalScratchBuffer == null) {
decimalScratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
}
LazyHiveDecimal.writeUTF8(output, dec, scale, decimalScratchBuffer);
-
- index++;
+ finishPrimitive();
}
@Override
public void writeHiveDecimal(HiveDecimalWritable decWritable, int scale) throws IOException {
- if (index > 0) {
- output.write(separator);
- }
-
+ beginPrimitive();
if (decimalScratchBuffer == null) {
decimalScratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES];
}
LazyHiveDecimal.writeUTF8(output, decWritable, scale, decimalScratchBuffer);
+ finishPrimitive();
+ }
+
+ private void beginComplex() {
+ if (index > 0) {
+ output.write(separators[currentLevel]);
+ }
+ indexStack.push(index);
+
+ // Always use index 0 so the write methods don't write a separator.
+ index = 0;
+
+ // Set "global" separator member to next level.
+ currentLevel++;
+ }
+
+ private void finishComplex() {
+ currentLevel--;
+ index = indexStack.pop();
+ index++;
+ }
+
+ @Override
+ public void beginList(List list) {
+ beginComplex();
+ }
+
+ @Override
+ public void separateList() {
+ }
+
+ @Override
+ public void finishList() {
+ finishComplex();
+ }
+
+ @Override
+ public void beginMap(Map<?, ?> map) {
+ beginComplex();
+
+ // MAP requires 2 levels: key separator and key-pair separator.
+ currentLevel++;
+ }
+
+ @Override
+ public void separateKey() {
+ index = 0;
+ output.write(separators[currentLevel]);
+ }
+
+ @Override
+ public void separateKeyValuePair() {
+ index = 0;
+ output.write(separators[currentLevel - 1]);
+ }
+
+ @Override
+ public void finishMap() {
+ // Remove MAP extra level.
+ currentLevel--;
+
+ finishComplex();
+ }
+
+ @Override
+ public void beginStruct(List fieldValues) {
+ beginComplex();
+ }
+
+ @Override
+ public void separateStruct() {
+ }
+
+ @Override
+ public void finishStruct() {
+ finishComplex();
+ }
+
+ @Override
+ public void beginUnion(int tag) throws IOException {
+ beginComplex();
+ writeInt(tag);
+ output.write(separators[currentLevel]);
+ index = 0;
+ }
+
+ @Override
+ public void finishUnion() {
+ finishComplex();
+ }
+
+ private void beginPrimitive() {
+ if (index > 0) {
+ output.write(separators[currentLevel]);
+ }
+ }
+ private void finishPrimitive() {
index++;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java
index e94ae99..8e0a499 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinaryDeserializeRead.java
@@ -20,19 +20,28 @@ package org.apache.hadoop.hive.serde2.lazybinary.fast;
import java.io.EOFException;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.Arrays;
+import java.util.Deque;
+import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.serde2.fast.DeserializeRead;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VLong;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.WritableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/*
* Directly deserialize with the caller reading field-by-field the LazyBinary serialization format.
@@ -55,26 +64,84 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
private int start;
private int offset;
private int end;
- private int fieldCount;
- private int fieldStart;
- private int fieldIndex;
- private byte nullByte;
+
+ private boolean skipLengthPrefix = false;
// Object to receive results of reading a decoded variable length int or long.
private VInt tempVInt;
private VLong tempVLong;
+ private Deque<Field> stack = new ArrayDeque<>();
+ private Field root;
+
+ private class Field {
+ Field[] children;
+
+ Category category;
+ PrimitiveCategory primitiveCategory;
+ TypeInfo typeInfo;
+
+ int index;
+ int count;
+ int start;
+ int end;
+ int nullByteStart;
+ byte nullByte;
+ byte tag;
+ }
+
public LazyBinaryDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer) {
super(typeInfos, useExternalBuffer);
- fieldCount = typeInfos.length;
tempVInt = new VInt();
tempVLong = new VLong();
currentExternalBufferNeeded = false;
+
+ root = new Field();
+ root.category = Category.STRUCT;
+ root.children = createFields(typeInfos);
+ root.count = typeInfos.length;
}
- // Not public since we must have the field count so every 8 fields NULL bytes can be navigated.
- private LazyBinaryDeserializeRead() {
- super();
+ private Field[] createFields(TypeInfo[] typeInfos) {
+ final Field[] children = new Field[typeInfos.length];
+ for (int i = 0; i < typeInfos.length; i++) {
+ children[i] = createField(typeInfos[i]);
+ }
+ return children;
+ }
+
+ private Field createField(TypeInfo typeInfo) {
+ final Field field = new Field();
+ final Category category = typeInfo.getCategory();
+ field.category = category;
+ field.typeInfo = typeInfo;
+ switch (category) {
+ case PRIMITIVE:
+ field.primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
+ break;
+ case LIST:
+ field.children = new Field[1];
+ field.children[0] = createField(((ListTypeInfo) typeInfo).getListElementTypeInfo());
+ break;
+ case MAP:
+ field.children = new Field[2];
+ field.children[0] = createField(((MapTypeInfo) typeInfo).getMapKeyTypeInfo());
+ field.children[1] = createField(((MapTypeInfo) typeInfo).getMapValueTypeInfo());
+ break;
+ case STRUCT:
+ final StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+ final List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ field.children = createFields(fieldTypeInfos.toArray(new TypeInfo[fieldTypeInfos.size()]));
+ break;
+ case UNION:
+ final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
+ final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos();
+ field.children = createFields(objectTypeInfos.toArray(new TypeInfo[objectTypeInfos.size()]));
+ break;
+ default:
+ throw new RuntimeException();
+ }
+ return field;
}
/*
@@ -86,7 +153,20 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
this.offset = offset;
start = offset;
end = offset + length;
- fieldIndex = 0;
+
+ stack.clear();
+ stack.push(root);
+ clearIndex(root);
+ }
+
+ private void clearIndex(Field field) {
+ field.index = 0;
+ if (field.children == null) {
+ return;
+ }
+ for (Field child : field.children) {
+ clearIndex(child);
+ }
}
/*
@@ -102,13 +182,13 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
sb.append(" for length ");
sb.append(end - start);
sb.append(" to read ");
- sb.append(fieldCount);
+ sb.append(root.children.length);
sb.append(" fields with types ");
sb.append(Arrays.toString(typeInfos));
sb.append(". Read field #");
- sb.append(fieldIndex);
+ sb.append(root.index);
sb.append(" at field start position ");
- sb.append(fieldStart);
+ sb.append(root.start);
sb.append(" current read offset ");
sb.append(offset);
@@ -127,263 +207,196 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
*/
@Override
public boolean readNextField() throws IOException {
- if (fieldIndex >= fieldCount) {
- return false;
- }
-
- fieldStart = offset;
+ return readComplexField();
+ }
- if (fieldIndex == 0) {
- // The rest of the range check for fields after the first is below after checking
- // the NULL byte.
- if (offset >= end) {
+ private boolean readPrimitive(Field field) throws IOException {
+ final PrimitiveCategory primitiveCategory = field.primitiveCategory;
+ final TypeInfo typeInfo = field.typeInfo;
+ switch (primitiveCategory) {
+ case BOOLEAN:
+ // No check needed for single byte read.
+ currentBoolean = (bytes[offset++] != 0);
+ break;
+ case BYTE:
+ // No check needed for single byte read.
+ currentByte = bytes[offset++];
+ break;
+ case SHORT:
+ // Last item -- ok to be at end.
+ if (offset + 2 > end) {
throw new EOFException();
}
- nullByte = bytes[offset++];
- }
-
- // NOTE: The bit is set to 1 if a field is NOT NULL. boolean isNull;
- if ((nullByte & (1 << (fieldIndex % 8))) == 0) {
-
- // Logically move past this field.
- fieldIndex++;
-
- // Every 8 fields we read a new NULL byte.
- if (fieldIndex < fieldCount) {
- if ((fieldIndex % 8) == 0) {
- // Get next null byte.
- if (offset >= end) {
- throw new EOFException();
- }
- nullByte = bytes[offset++];
- }
+ currentShort = LazyBinaryUtils.byteArrayToShort(bytes, offset);
+ offset += 2;
+ break;
+ case INT:
+ // Parse the first byte of a vint/vlong to determine the number of bytes.
+ if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
+ throw new EOFException();
}
- return false;
- } else {
-
- // Make sure there is at least one byte that can be read for a value.
- if (offset >= end) {
+ LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
+ offset += tempVInt.length;
+ currentInt = tempVInt.value;
+ break;
+ case LONG:
+ // Parse the first byte of a vint/vlong to determine the number of bytes.
+ if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
throw new EOFException();
}
-
- /*
- * We have a field and are positioned to it. Read it.
- */
- switch (primitiveCategories[fieldIndex]) {
- case BOOLEAN:
- // No check needed for single byte read.
- currentBoolean = (bytes[offset++] != 0);
- break;
- case BYTE:
- // No check needed for single byte read.
- currentByte = bytes[offset++];
- break;
- case SHORT:
- // Last item -- ok to be at end.
- if (offset + 2 > end) {
- throw new EOFException();
- }
- currentShort = LazyBinaryUtils.byteArrayToShort(bytes, offset);
- offset += 2;
- break;
- case INT:
+ LazyBinaryUtils.readVLong(bytes, offset, tempVLong);
+ offset += tempVLong.length;
+ currentLong = tempVLong.value;
+ break;
+ case FLOAT:
+ // Last item -- ok to be at end.
+ if (offset + 4 > end) {
+ throw new EOFException();
+ }
+ currentFloat = Float.intBitsToFloat(LazyBinaryUtils.byteArrayToInt(bytes, offset));
+ offset += 4;
+ break;
+ case DOUBLE:
+ // Last item -- ok to be at end.
+ if (offset + 8 > end) {
+ throw new EOFException();
+ }
+ currentDouble = Double.longBitsToDouble(LazyBinaryUtils.byteArrayToLong(bytes, offset));
+ offset += 8;
+ break;
+
+ case BINARY:
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ {
+ // using vint instead of 4 bytes
// Parse the first byte of a vint/vlong to determine the number of bytes.
if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
throw new EOFException();
}
LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
offset += tempVInt.length;
- currentInt = tempVInt.value;
- break;
- case LONG:
- // Parse the first byte of a vint/vlong to determine the number of bytes.
- if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
- throw new EOFException();
- }
- LazyBinaryUtils.readVLong(bytes, offset, tempVLong);
- offset += tempVLong.length;
- currentLong = tempVLong.value;
- break;
- case FLOAT:
+
+ int saveStart = offset;
+ int length = tempVInt.value;
+ offset += length;
// Last item -- ok to be at end.
- if (offset + 4 > end) {
+ if (offset > end) {
throw new EOFException();
}
- currentFloat = Float.intBitsToFloat(LazyBinaryUtils.byteArrayToInt(bytes, offset));
- offset += 4;
- break;
- case DOUBLE:
+
+ currentBytes = bytes;
+ currentBytesStart = saveStart;
+ currentBytesLength = length;
+ }
+ break;
+ case DATE:
+ // Parse the first byte of a vint/vlong to determine the number of bytes.
+ if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
+ throw new EOFException();
+ }
+ LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
+ offset += tempVInt.length;
+
+ currentDateWritable.set(tempVInt.value);
+ break;
+ case TIMESTAMP:
+ {
+ int length = TimestampWritable.getTotalLength(bytes, offset);
+ int saveStart = offset;
+ offset += length;
// Last item -- ok to be at end.
- if (offset + 8 > end) {
+ if (offset > end) {
throw new EOFException();
}
- currentDouble = Double.longBitsToDouble(LazyBinaryUtils.byteArrayToLong(bytes, offset));
- offset += 8;
- break;
-
- case BINARY:
- case STRING:
- case CHAR:
- case VARCHAR:
- {
- // using vint instead of 4 bytes
- // Parse the first byte of a vint/vlong to determine the number of bytes.
- if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
- throw new EOFException();
- }
- LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
- offset += tempVInt.length;
-
- int saveStart = offset;
- int length = tempVInt.value;
- offset += length;
- // Last item -- ok to be at end.
- if (offset > end) {
- throw new EOFException();
- }
-
- currentBytes = bytes;
- currentBytesStart = saveStart;
- currentBytesLength = length;
- }
- break;
- case DATE:
+
+ currentTimestampWritable.set(bytes, saveStart);
+ }
+ break;
+ case INTERVAL_YEAR_MONTH:
+ // Parse the first byte of a vint/vlong to determine the number of bytes.
+ if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
+ throw new EOFException();
+ }
+ LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
+ offset += tempVInt.length;
+
+ currentHiveIntervalYearMonthWritable.set(tempVInt.value);
+ break;
+ case INTERVAL_DAY_TIME:
+ // The first bounds check requires at least one more byte beyond for 2nd int (hence >=).
+ // Parse the first byte of a vint/vlong to determine the number of bytes.
+ if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) {
+ throw new EOFException();
+ }
+ LazyBinaryUtils.readVLong(bytes, offset, tempVLong);
+ offset += tempVLong.length;
+
+ // Parse the first byte of a vint/vlong to determine the number of bytes.
+ if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
+ throw new EOFException();
+ }
+ LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
+ offset += tempVInt.length;
+
+ currentHiveIntervalDayTimeWritable.set(tempVLong.value, tempVInt.value);
+ break;
+ case DECIMAL:
+ {
+ // Since enforcing precision and scale can cause a HiveDecimal to become NULL,
+ // we must read it, enforce it here, and either return NULL or buffer the result.
+
+ // These calls are to see how much data there is. The setFromBytes call below will do the same
+ // readVInt reads but actually unpack the decimal.
+
+ // The first bounds check requires at least one more byte beyond for 2nd int (hence >=).
// Parse the first byte of a vint/vlong to determine the number of bytes.
- if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
+ if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) {
throw new EOFException();
}
LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
offset += tempVInt.length;
+ int readScale = tempVInt.value;
- currentDateWritable.set(tempVInt.value);
- break;
- case TIMESTAMP:
- {
- int length = TimestampWritable.getTotalLength(bytes, offset);
- int saveStart = offset;
- offset += length;
- // Last item -- ok to be at end.
- if (offset > end) {
- throw new EOFException();
- }
-
- currentTimestampWritable.set(bytes, saveStart);
- }
- break;
- case INTERVAL_YEAR_MONTH:
// Parse the first byte of a vint/vlong to determine the number of bytes.
if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
throw new EOFException();
}
LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
offset += tempVInt.length;
-
- currentHiveIntervalYearMonthWritable.set(tempVInt.value);
- break;
- case INTERVAL_DAY_TIME:
- // The first bounds check requires at least one more byte beyond for 2nd int (hence >=).
- // Parse the first byte of a vint/vlong to determine the number of bytes.
- if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) {
+ int saveStart = offset;
+ offset += tempVInt.value;
+ // Last item -- ok to be at end.
+ if (offset > end) {
throw new EOFException();
}
- LazyBinaryUtils.readVLong(bytes, offset, tempVLong);
- offset += tempVLong.length;
+ int length = offset - saveStart;
- // Parse the first byte of a vint/vlong to determine the number of bytes.
- if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
- throw new EOFException();
- }
- LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
- offset += tempVInt.length;
+ // scale = 2, length = 6, value = -6065716379.11
+ // \002\006\255\114\197\131\083\105
+ // \255\114\197\131\083\105
- currentHiveIntervalDayTimeWritable.set(tempVLong.value, tempVInt.value);
- break;
- case DECIMAL:
- {
- // Since enforcing precision and scale can cause a HiveDecimal to become NULL,
- // we must read it, enforce it here, and either return NULL or buffer the result.
-
- // These calls are to see how much data there is. The setFromBytes call below will do the same
- // readVInt reads but actually unpack the decimal.
-
- // The first bounds check requires at least one more byte beyond for 2nd int (hence >=).
- // Parse the first byte of a vint/vlong to determine the number of bytes.
- if (offset + WritableUtils.decodeVIntSize(bytes[offset]) >= end) {
- throw new EOFException();
- }
- LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
- offset += tempVInt.length;
- int readScale = tempVInt.value;
-
- // Parse the first byte of a vint/vlong to determine the number of bytes.
- if (offset + WritableUtils.decodeVIntSize(bytes[offset]) > end) {
- throw new EOFException();
- }
- LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
- offset += tempVInt.length;
- int saveStart = offset;
- offset += tempVInt.value;
- // Last item -- ok to be at end.
- if (offset > end) {
- throw new EOFException();
- }
- int length = offset - saveStart;
-
- // scale = 2, length = 6, value = -6065716379.11
- // \002\006\255\114\197\131\083\105
- // \255\114\197\131\083\105
-
- currentHiveDecimalWritable.setFromBigIntegerBytesAndScale(
- bytes, saveStart, length, readScale);
- boolean decimalIsNull = !currentHiveDecimalWritable.isSet();
- if (!decimalIsNull) {
-
- DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfos[fieldIndex];
-
- int precision = decimalTypeInfo.getPrecision();
- int scale = decimalTypeInfo.getScale();
-
- decimalIsNull = !currentHiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale);
- }
- if (decimalIsNull) {
-
- // Logically move past this field.
- fieldIndex++;
-
- // Every 8 fields we read a new NULL byte.
- if (fieldIndex < fieldCount) {
- if ((fieldIndex % 8) == 0) {
- // Get next null byte.
- if (offset >= end) {
- throw new EOFException();
- }
- nullByte = bytes[offset++];
- }
- }
- return false;
- }
- }
- break;
+ currentHiveDecimalWritable.setFromBigIntegerBytesAndScale(
+ bytes, saveStart, length, readScale);
+ boolean decimalIsNull = !currentHiveDecimalWritable.isSet();
+ if (!decimalIsNull) {
- default:
- throw new Error("Unexpected primitive category " + primitiveCategories[fieldIndex].name());
- }
- }
+ final DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
- // Logically move past this field.
- fieldIndex++;
+ final int precision = decimalTypeInfo.getPrecision();
+ final int scale = decimalTypeInfo.getScale();
- // Every 8 fields we read a new NULL byte.
- if (fieldIndex < fieldCount) {
- if ((fieldIndex % 8) == 0) {
- // Get next null byte.
- if (offset >= end) {
- throw new EOFException();
+ decimalIsNull = !currentHiveDecimalWritable.mutateEnforcePrecisionScale(precision, scale);
+ }
+ if (decimalIsNull) {
+ return false;
}
- nullByte = bytes[offset++];
}
+ break;
+ default:
+ throw new Error("Unexpected primitive category " + primitiveCategory.name());
}
-
return true;
}
@@ -394,8 +407,37 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
* Designed for skipping columns that are not included.
*/
public void skipNextField() throws IOException {
- // Not a known use case for LazyBinary -- so don't optimize.
- readNextField();
+ final Field current = stack.peek();
+ final boolean isNull = isNull(current);
+
+ if (isNull) {
+ current.index++;
+ return;
+ }
+
+ if (readUnionTag(current)) {
+ current.index++;
+ return;
+ }
+
+ final Field child = getChild(current);
+
+ if (child.category == Category.PRIMITIVE) {
+ readPrimitive(child);
+ current.index++;
+ } else {
+ parseHeader(child);
+ stack.push(child);
+
+ for (int i = 0; i < child.count; i++) {
+ skipNextField();
+ }
+ finishComplexVariableFieldsType();
+ }
+
+ if (offset > end) {
+ throw new EOFException();
+ }
}
/*
@@ -412,4 +454,141 @@ public final class LazyBinaryDeserializeRead extends DeserializeRead {
public boolean isEndOfInputReached() {
return (offset == end);
}
+
+ private boolean isNull(Field field) {
+ final byte b = (byte) (1 << (field.index % 8));
+ switch (field.category) {
+ case PRIMITIVE:
+ return false;
+ case LIST:
+ case MAP:
+ final byte nullByte = bytes[field.nullByteStart + (field.index / 8)];
+ return (nullByte & b) == 0;
+ case STRUCT:
+ if (field.index % 8 == 0) {
+ field.nullByte = bytes[offset++];
+ }
+ return (field.nullByte & b) == 0;
+ case UNION:
+ return false;
+ default:
+ throw new RuntimeException();
+ }
+ }
+
+ private void parseHeader(Field field) {
+ // Init
+ field.index = 0;
+ field.start = offset;
+
+ // Read length
+ if (!skipLengthPrefix) {
+ final int length = LazyBinaryUtils.byteArrayToInt(bytes, offset);
+ offset += 4;
+ field.end = offset + length;
+ }
+
+ switch (field.category) {
+ case LIST:
+ case MAP:
+ // Read count
+ LazyBinaryUtils.readVInt(bytes, offset, tempVInt);
+ if (field.category == Category.LIST) {
+ field.count = tempVInt.value;
+ } else {
+ field.count = tempVInt.value * 2;
+ }
+ offset += tempVInt.length;
+
+ // Null byte start
+ field.nullByteStart = offset;
+ offset += ((field.count) + 7) / 8;
+ break;
+ case STRUCT:
+ field.count = ((StructTypeInfo) field.typeInfo).getAllStructFieldTypeInfos().size();
+ break;
+ case UNION:
+ field.count = 2;
+ break;
+ }
+ }
+
+ private Field getChild(Field field) {
+ switch (field.category) {
+ case LIST:
+ return field.children[0];
+ case MAP:
+ return field.children[field.index % 2];
+ case STRUCT:
+ return field.children[field.index];
+ case UNION:
+ return field.children[field.tag];
+ default:
+ throw new RuntimeException();
+ }
+ }
+
+ private boolean readUnionTag(Field field) {
+ if (field.category == Category.UNION && field.index == 0) {
+ field.tag = bytes[offset++];
+ currentInt = field.tag;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // Push or next
+ @Override
+ public boolean readComplexField() throws IOException {
+ final Field current = stack.peek();
+ boolean isNull = isNull(current);
+
+ if (isNull) {
+ current.index++;
+ return false;
+ }
+
+ if (readUnionTag(current)) {
+ current.index++;
+ return true;
+ }
+
+ final Field child = getChild(current);
+
+ if (child.category == Category.PRIMITIVE) {
+ isNull = !readPrimitive(child);
+ current.index++;
+ } else {
+ parseHeader(child);
+ stack.push(child);
+ }
+
+ if (offset > end) {
+ throw new EOFException();
+ }
+ return !isNull;
+ }
+
+ // Pop (list, map)
+ @Override
+ public boolean isNextComplexMultiValue() {
+ Field current = stack.peek();
+ final boolean isNext = current.index < current.count;
+ if (!isNext) {
+ stack.pop();
+ stack.peek().index++;
+ }
+ return isNext;
+ }
+
+ // Pop (struct, union)
+ @Override
+ public void finishComplexVariableFieldsType() {
+ stack.pop();
+ if (stack.peek() == null) {
+ throw new RuntimeException();
+ }
+ stack.peek().index++;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java
index 085d71c..e50ff5e 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazybinary/fast/LazyBinarySerializeWrite.java
@@ -21,7 +21,13 @@ package org.apache.hadoop.hive.serde2.lazybinary.fast;
import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.type.HiveChar;
@@ -38,7 +44,11 @@ import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
import org.apache.hadoop.hive.serde2.fast.SerializeWrite;
-import org.apache.hive.common.util.DateUtils;
+
+import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.LIST;
+import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.MAP;
+import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.STRUCT;
+import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.UNION;
/*
* Directly serialize, field-by-field, the LazyBinary format.
@@ -50,10 +60,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
private Output output;
- private int fieldCount;
- private int fieldIndex;
- private byte nullByte;
- private long nullOffset;
+ private int rootFieldCount;
+ private boolean skipLengthPrefix = false;
// For thread safety, we allocate private writable objects for our use only.
private TimestampWritable timestampWritable;
@@ -64,10 +72,30 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
private long[] scratchLongs;
private byte[] scratchBuffer;
+ private Field root;
+ private Deque<Field> stack = new ArrayDeque<>();
+ private LazyBinarySerDe.BooleanRef warnedOnceNullMapKey;
+
+ private static class Field {
+ Category type;
+
+ int fieldCount;
+ int fieldIndex;
+ int byteSizeStart;
+ int start;
+ long nullOffset;
+ byte nullByte;
+
+ Field(Category type) {
+ this.type = type;
+ }
+ }
+
public LazyBinarySerializeWrite(int fieldCount) {
this();
vLongBytes = new byte[LazyBinaryUtils.VLONG_BYTES_LEN];
- this.fieldCount = fieldCount;
+ this.rootFieldCount = fieldCount;
+ resetWithoutOutput();
}
// Not public since we must have the field count and other information.
@@ -81,9 +109,7 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
public void set(Output output) {
this.output = output;
output.reset();
- fieldIndex = 0;
- nullByte = 0;
- nullOffset = 0;
+ resetWithoutOutput();
}
/*
@@ -92,9 +118,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
@Override
public void setAppend(Output output) {
this.output = output;
- fieldIndex = 0;
- nullByte = 0;
- nullOffset = output.getLength();
+ resetWithoutOutput();
+ root.nullOffset = output.getLength();
}
/*
@@ -103,57 +128,45 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
@Override
public void reset() {
output.reset();
- fieldIndex = 0;
- nullByte = 0;
- nullOffset = 0;
+ resetWithoutOutput();
}
- /*
- * General Pattern:
- *
- * // Every 8 fields we write a NULL byte.
- * IF ((fieldIndex % 8) == 0), then
- * IF (fieldIndex > 0), then
- * Write back previous NullByte
- * NullByte = 0
- * Remember write position
- * Allocate room for next NULL byte.
- *
- * WHEN NOT NULL: Set bit in NULL byte; Write value.
- * OTHERWISE NULL: We do not set a bit in the nullByte when we are writing a null.
- *
- * Increment fieldIndex
- *
- * IF (fieldIndex == fieldCount), then
- * Write back final NullByte
- *
- */
+ private void resetWithoutOutput() {
+ root = new Field(STRUCT);
+ root.fieldCount = rootFieldCount;
+ stack.clear();
+ stack.push(root);
+ warnedOnceNullMapKey = null;
+ }
/*
* Write a NULL field.
*/
@Override
public void writeNull() throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
+ final Field current = stack.peek();
+
+ if (current.type == STRUCT) {
+ // Every 8 fields we write a NULL byte.
+ if ((current.fieldIndex % 8) == 0) {
+ if (current.fieldIndex > 0) {
+ // Write back previous 8 field's NULL byte.
+ output.writeByte(current.nullOffset, current.nullByte);
+ current.nullByte = 0;
+ current.nullOffset = output.getLength();
+ }
+ // Allocate next NULL byte.
+ output.reserve(1);
}
- // Allocate next NULL byte.
- output.reserve(1);
- }
- // We DO NOT set a bit in the NULL byte when we are writing a NULL.
+ // We DO NOT set a bit in the NULL byte when we are writing a NULL.
- fieldIndex++;
+ current.fieldIndex++;
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
+ if (current.fieldIndex == current.fieldCount) {
+ // Write back the final NULL byte before the last fields.
+ output.writeByte(current.nullOffset, current.nullByte);
+ }
}
}
@@ -162,30 +175,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeBoolean(boolean v) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
output.write((byte) (v ? 1 : 0));
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -193,30 +185,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeByte(byte v) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
output.write(v);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -224,31 +195,10 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeShort(short v) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
output.write((byte) (v >> 8));
output.write((byte) (v));
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -256,30 +206,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeInt(int v) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
writeVInt(v);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -287,30 +216,9 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeLong(long v) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
writeVLong(v);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -318,34 +226,13 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeFloat(float vf) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
int v = Float.floatToIntBits(vf);
output.write((byte) (v >> 24));
output.write((byte) (v >> 16));
output.write((byte) (v >> 8));
output.write((byte) (v));
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -353,97 +240,32 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeDouble(double v) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
LazyBinaryUtils.writeDouble(output, v);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
* STRING.
- *
+ *
* Can be used to write CHAR and VARCHAR when the caller takes responsibility for
* truncation/padding issues.
*/
@Override
public void writeString(byte[] v) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
- int length = v.length;
+ beginElement();
+ final int length = v.length;
writeVInt(length);
-
output.write(v, 0, length);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
@Override
public void writeString(byte[] v, int start, int length) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
writeVInt(length);
-
output.write(v, start, length);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -451,8 +273,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeHiveChar(HiveChar hiveChar) throws IOException {
- String string = hiveChar.getStrippedValue();
- byte[] bytes = string.getBytes();
+ final String string = hiveChar.getStrippedValue();
+ final byte[] bytes = string.getBytes();
writeString(bytes);
}
@@ -461,8 +283,8 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeHiveVarchar(HiveVarchar hiveVarchar) throws IOException {
- String string = hiveVarchar.getValue();
- byte[] bytes = string.getBytes();
+ final String string = hiveVarchar.getValue();
+ final byte[] bytes = string.getBytes();
writeString(bytes);
}
@@ -484,59 +306,17 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeDate(Date date) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
writeVInt(DateWritable.dateToDays(date));
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
// We provide a faster way to write a date without a Date object.
@Override
public void writeDate(int dateAsDays) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
writeVInt(dateAsDays);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -544,34 +324,13 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeTimestamp(Timestamp v) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
if (timestampWritable == null) {
timestampWritable = new TimestampWritable();
}
timestampWritable.set(v);
timestampWritable.writeToByteStream(output);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -579,66 +338,24 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeHiveIntervalYearMonth(HiveIntervalYearMonth viyt) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
if (hiveIntervalYearMonthWritable == null) {
hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable();
}
hiveIntervalYearMonthWritable.set(viyt);
hiveIntervalYearMonthWritable.writeToByteStream(output);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
@Override
public void writeHiveIntervalYearMonth(int totalMonths) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
if (hiveIntervalYearMonthWritable == null) {
hiveIntervalYearMonthWritable = new HiveIntervalYearMonthWritable();
}
hiveIntervalYearMonthWritable.set(totalMonths);
hiveIntervalYearMonthWritable.writeToByteStream(output);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -646,34 +363,13 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeHiveIntervalDayTime(HiveIntervalDayTime vidt) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
if (hiveIntervalDayTimeWritable == null) {
hiveIntervalDayTimeWritable = new HiveIntervalDayTimeWritable();
}
hiveIntervalDayTimeWritable.set(vidt);
hiveIntervalDayTimeWritable.writeToByteStream(output);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -684,22 +380,7 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
*/
@Override
public void writeHiveDecimal(HiveDecimal dec, int scale) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
if (scratchLongs == null) {
scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN];
scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_BIG_INTEGER_BYTES];
@@ -709,33 +390,12 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
dec,
scratchLongs,
scratchBuffer);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
@Override
public void writeHiveDecimal(HiveDecimalWritable decWritable, int scale) throws IOException {
-
- // Every 8 fields we write a NULL byte.
- if ((fieldIndex % 8) == 0) {
- if (fieldIndex > 0) {
- // Write back previous 8 field's NULL byte.
- output.writeByte(nullOffset, nullByte);
- nullByte = 0;
- nullOffset = output.getLength();
- }
- // Allocate next NULL byte.
- output.reserve(1);
- }
-
- // Set bit in NULL byte when a field is NOT NULL.
- nullByte |= 1 << (fieldIndex % 8);
-
+ beginElement();
if (scratchLongs == null) {
scratchLongs = new long[HiveDecimal.SCRATCH_LONGS_LEN];
scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_BIG_INTEGER_BYTES];
@@ -745,13 +405,7 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
decWritable,
scratchLongs,
scratchBuffer);
-
- fieldIndex++;
-
- if (fieldIndex == fieldCount) {
- // Write back the final NULL byte before the last fields.
- output.writeByte(nullOffset, nullByte);
- }
+ finishElement();
}
/*
@@ -767,4 +421,241 @@ public class LazyBinarySerializeWrite implements SerializeWrite {
final int len = LazyBinaryUtils.writeVLongToByteArray(vLongBytes, v);
output.write(vLongBytes, 0, len);
}
+
+ @Override
+ public void beginList(List list) {
+ final Field current = new Field(LIST);
+ beginComplex(current);
+
+ final int size = list.size();
+ current.fieldCount = size;
+
+ if (!skipLengthPrefix) {
+ // 1/ reserve spaces for the byte size of the list
+ // which is a integer and takes four bytes
+ current.byteSizeStart = output.getLength();
+ output.reserve(4);
+ current.start = output.getLength();
+ }
+ // 2/ write the size of the list as a VInt
+ LazyBinaryUtils.writeVInt(output, size);
+
+ // 3/ write the null bytes
+ byte nullByte = 0;
+ for (int eid = 0; eid < size; eid++) {
+ // set the bit to 1 if an element is not null
+ if (null != list.get(eid)) {
+ nullByte |= 1 << (eid % 8);
+ }
+ // store the byte every eight elements or
+ // if this is the last element
+ if (7 == eid % 8 || eid == size - 1) {
+ output.write(nullByte);
+ nullByte = 0;
+ }
+ }
+ }
+
+ @Override
+ public void separateList() {
+ }
+
+ @Override
+ public void finishList() {
+ final Field current = stack.peek();
+
+ if (!skipLengthPrefix) {
+ // 5/ update the list byte size
+ int listEnd = output.getLength();
+ int listSize = listEnd - current.start;
+ writeSizeAtOffset(output, current.byteSizeStart, listSize);
+ }
+
+ finishComplex();
+ }
+
+ @Override
+ public void beginMap(Map<?, ?> map) {
+ final Field current = new Field(MAP);
+ beginComplex(current);
+
+ if (!skipLengthPrefix) {
+ // 1/ reserve spaces for the byte size of the map
+ // which is a integer and takes four bytes
+ current.byteSizeStart = output.getLength();
+ output.reserve(4);
+ current.start = output.getLength();
+ }
+
+ // 2/ write the size of the map which is a VInt
+ final int size = map.size();
+ current.fieldIndex = size;
+ LazyBinaryUtils.writeVInt(output, size);
+
+ // 3/ write the null bytes
+ int b = 0;
+ byte nullByte = 0;
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ // set the bit to 1 if a key is not null
+ if (null != entry.getKey()) {
+ nullByte |= 1 << (b % 8);
+ } else if (warnedOnceNullMapKey != null) {
+ if (!warnedOnceNullMapKey.value) {
+ LOG.warn("Null map key encountered! Ignoring similar problems.");
+ }
+ warnedOnceNullMapKey.value = true;
+ }
+ b++;
+ // set the bit to 1 if a value is not null
+ if (null != entry.getValue()) {
+ nullByte |= 1 << (b % 8);
+ }
+ b++;
+ // write the byte to stream every 4 key-value pairs
+ // or if this is the last key-value pair
+ if (0 == b % 8 || b == size * 2) {
+ output.write(nullByte);
+ nullByte = 0;
+ }
+ }
+ }
+
+ @Override
+ public void separateKey() {
+ }
+
+ @Override
+ public void separateKeyValuePair() {
+ }
+
+ @Override
+ public void finishMap() {
+ final Field current = stack.peek();
+
+ if (!skipLengthPrefix) {
+ // 5/ update the byte size of the map
+ int mapEnd = output.getLength();
+ int mapSize = mapEnd - current.start;
+ writeSizeAtOffset(output, current.byteSizeStart, mapSize);
+ }
+
+ finishComplex();
+ }
+
+ @Override
+ public void beginStruct(List fieldValues) {
+ final Field current = new Field(STRUCT);
+ beginComplex(current);
+
+ current.fieldCount = fieldValues.size();
+
+ if (!skipLengthPrefix) {
+ // 1/ reserve spaces for the byte size of the struct
+ // which is a integer and takes four bytes
+ current.byteSizeStart = output.getLength();
+ output.reserve(4);
+ current.start = output.getLength();
+ }
+ current.nullOffset = output.getLength();
+ }
+
+ @Override
+ public void separateStruct() {
+ }
+
+ @Override
+ public void finishStruct() {
+ final Field current = stack.peek();
+
+ if (!skipLengthPrefix) {
+ // 3/ update the byte size of the struct
+ int typeEnd = output.getLength();
+ int typeSize = typeEnd - current.start;
+ writeSizeAtOffset(output, current.byteSizeStart, typeSize);
+ }
+
+ finishComplex();
+ }
+
+ @Override
+ public void beginUnion(int tag) throws IOException {
+ final Field current = new Field(UNION);
+ beginComplex(current);
+
+ current.fieldCount = 1;
+
+ if (!skipLengthPrefix) {
+ // 1/ reserve spaces for the byte size of the struct
+ // which is a integer and takes four bytes
+ current.byteSizeStart = output.getLength();
+ output.reserve(4);
+ current.start = output.getLength();
+ }
+
+ // 2/ serialize the union
+ output.write(tag);
+ }
+
+ @Override
+ public void finishUnion() {
+ final Field current = stack.peek();
+
+ if (!skipLengthPrefix) {
+ // 3/ update the byte size of the struct
+ int typeEnd = output.getLength();
+ int typeSize = typeEnd - current.start;
+ writeSizeAtOffset(output, current.byteSizeStart, typeSize);
+ }
+
+ finishComplex();
+ }
+
+ private void beginElement() {
+ final Field current = stack.peek();
+
+ if (current.type == STRUCT) {
+ // Every 8 fields we write a NULL byte.
+ if ((current.fieldIndex % 8) == 0) {
+ if (current.fieldIndex > 0) {
+ // Write back previous 8 field's NULL byte.
+ output.writeByte(current.nullOffset, current.nullByte);
+ current.nullByte = 0;
+ current.nullOffset = output.getLength();
+ }
+ // Allocate next NULL byte.
+ output.reserve(1);
+ }
+
+ // Set bit in NULL byte when a field is NOT NULL.
+ current.nullByte |= 1 << (current.fieldIndex % 8);
+ }
+ }
+
+ private void finishElement() {
+ final Field current = stack.peek();
+
+ if (current.type == STRUCT) {
+ current.fieldIndex++;
+
+ if (current.fieldIndex == current.fieldCount) {
+ // Write back the final NULL byte before the last fields.
+ output.writeByte(current.nullOffset, current.nullByte);
+ }
+ }
+ }
+
+ private void beginComplex(Field field) {
+ beginElement();
+ stack.push(field);
+ }
+
+ private void finishComplex() {
+ stack.pop();
+ finishElement();
+ }
+
+ private static void writeSizeAtOffset(
+ ByteStream.RandomAccessOutput byteStream, int byteSizeStart, int size) {
+ byteStream.writeInt(byteSizeStart, size);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java
index f26c9ec..7b28682 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/StandardUnionObjectInspector.java
@@ -79,6 +79,31 @@ public class StandardUnionObjectInspector extends SettableUnionObjectInspector {
public String toString() {
return tag + ":" + object;
}
+
+ @Override
+ public int hashCode() {
+ if (object == null) {
+ return tag;
+ } else {
+ return object.hashCode() ^ tag;
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof StandardUnion)) {
+ return false;
+ }
+ StandardUnion that = (StandardUnion) obj;
+ if (this.object == null || that.object == null) {
+ return this.tag == that.tag && this.object == that.object;
+ } else {
+ return this.tag == that.tag && this.object.equals(that.object);
+ }
+ }
}
/**