You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/09/19 22:41:54 UTC
[08/34] hive git commit: HIVE-13878: Vectorization: Column pruning
for Text vectorization (Matt McCline, reviewed by Gopal Vijayaraghavan)
HIVE-13878: Vectorization: Column pruning for Text vectorization (Matt McCline, reviewed by Gopal Vijayaraghavan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0b62e6f3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0b62e6f3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0b62e6f3
Branch: refs/heads/hive-14535
Commit: 0b62e6f38788de81816abacf025d61bbc80d75fa
Parents: ff67cdd
Author: Matt McCline <mm...@hortonworks.com>
Authored: Tue Sep 13 23:15:56 2016 -0700
Committer: Matt McCline <mm...@hortonworks.com>
Committed: Tue Sep 13 23:15:56 2016 -0700
----------------------------------------------------------------------
.../ql/exec/vector/VectorDeserializeRow.java | 238 +++---
.../hive/ql/exec/vector/VectorMapOperator.java | 22 +-
.../fast/VectorMapJoinFastLongHashTable.java | 2 +-
.../fast/VectorMapJoinFastStringCommon.java | 2 +-
.../VectorMapJoinOptimizedLongCommon.java | 56 --
.../VectorMapJoinOptimizedStringCommon.java | 26 -
.../hive/ql/optimizer/physical/Vectorizer.java | 25 +-
.../hive/ql/exec/vector/TestVectorSerDeRow.java | 14 +-
.../mapjoin/fast/CheckFastRowHashMap.java | 10 +-
.../exec/vector/mapjoin/fast/VerifyFastRow.java | 2 +-
.../fast/BinarySortableDeserializeRead.java | 132 ++--
.../hive/serde2/fast/DeserializeRead.java | 71 +-
.../lazy/fast/LazySimpleDeserializeRead.java | 770 ++++++++++---------
.../fast/LazyBinaryDeserializeRead.java | 119 +--
.../apache/hadoop/hive/serde2/VerifyFast.java | 2 +-
.../binarysortable/TestBinarySortableFast.java | 35 +-
.../hive/serde2/lazy/TestLazySimpleFast.java | 31 +-
.../serde2/lazybinary/TestLazyBinaryFast.java | 32 +-
18 files changed, 787 insertions(+), 802 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
index 47bef43..d31d338 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector;
import java.io.EOFException;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
@@ -97,20 +98,27 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
* We say "source" because when there is conversion we are converting th deserialized source into
* a target data type.
*/
- boolean[] isConvert;
+
+ private boolean useReadField;
+ // True when the (random access) readField method of DeserializeRead are being used.
+
+ private int[] readFieldLogicalIndices;
+ // The logical indices for reading with readField.
+
+ private boolean[] isConvert;
// For each column, are we converting the row column?
- int[] projectionColumnNums;
+ private int[] projectionColumnNums;
// Assigning can be a subset of columns, so this is the projection --
// the batch column numbers.
- Category[] sourceCategories;
+ private Category[] sourceCategories;
// The data type category of each column being deserialized.
- PrimitiveCategory[] sourcePrimitiveCategories;
+ private PrimitiveCategory[] sourcePrimitiveCategories;
//The data type primitive category of each column being deserialized.
- int[] maxLengths;
+ private int[] maxLengths;
// For the CHAR and VARCHAR data types, the maximum character length of
// the columns. Otherwise, 0.
@@ -131,6 +139,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
private void allocateArrays(int count) {
isConvert = new boolean[count];
projectionColumnNums = new int[count];
+ Arrays.fill(projectionColumnNums, -1);
sourceCategories = new Category[count];
sourcePrimitiveCategories = new PrimitiveCategory[count];
maxLengths = new int[count];
@@ -231,14 +240,18 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
public void init(boolean[] columnsToIncludeTruncated) throws HiveException {
- if (columnsToIncludeTruncated != null) {
- deserializeRead.setColumnsToInclude(columnsToIncludeTruncated);
- }
+ // When truncated included is used, its length must be at least the number of source type infos.
+ // When longer, we assume the caller will default with nulls, etc.
+ Preconditions.checkState(
+ columnsToIncludeTruncated == null ||
+ columnsToIncludeTruncated.length == sourceTypeInfos.length);
- final int columnCount = (columnsToIncludeTruncated == null ?
- sourceTypeInfos.length : columnsToIncludeTruncated.length);
+ final int columnCount = sourceTypeInfos.length;
allocateArrays(columnCount);
+ int includedCount = 0;
+ int[] includedIndices = new int[columnCount];
+
for (int i = 0; i < columnCount; i++) {
if (columnsToIncludeTruncated != null && !columnsToIncludeTruncated[i]) {
@@ -248,9 +261,16 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
} else {
initSourceEntry(i, i, sourceTypeInfos[i]);
-
+ includedIndices[includedCount++] = i;
}
}
+
+ // Optimizing for readField?
+ if (includedCount < columnCount && deserializeRead.isReadFieldSupported()) {
+ useReadField = true;
+ readFieldLogicalIndices = Arrays.copyOf(includedIndices, includedCount);
+ }
+
}
/**
@@ -258,37 +278,33 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
* DeserializedRead interface passed to the constructor to the target data types desired in
* the VectorizedRowBatch.
*
- * No projection -- the column range 0 .. count-1
- *
- * where count is the minimum of the target data type array size, included array size,
- * and source data type array size.
+ * No projection -- using the column range 0 .. columnCount-1
*
* @param targetTypeInfos
* @param columnsToIncludeTruncated
- * @return the minimum count described above is returned. That is, the number of columns
- * that will be processed by deserialize.
* @throws HiveException
*/
- public int initConversion(TypeInfo[] targetTypeInfos,
+ public void initConversion(TypeInfo[] targetTypeInfos,
boolean[] columnsToIncludeTruncated) throws HiveException {
- if (columnsToIncludeTruncated != null) {
- deserializeRead.setColumnsToInclude(columnsToIncludeTruncated);
- }
+ // We assume the caller will handle extra columns default with nulls, etc.
+ Preconditions.checkState(targetTypeInfos.length >= sourceTypeInfos.length);
- int targetColumnCount;
- if (columnsToIncludeTruncated == null) {
- targetColumnCount = targetTypeInfos.length;
- } else {
- targetColumnCount = Math.min(targetTypeInfos.length, columnsToIncludeTruncated.length);
- }
+ // When truncated included is used, its length must be at least the number of source type infos.
+ // When longer, we assume the caller will default with nulls, etc.
+ Preconditions.checkState(
+ columnsToIncludeTruncated == null ||
+ columnsToIncludeTruncated.length >= sourceTypeInfos.length);
- int sourceColumnCount = Math.min(sourceTypeInfos.length, targetColumnCount);
- allocateArrays(sourceColumnCount);
- allocateConvertArrays(sourceColumnCount);
+ final int columnCount = sourceTypeInfos.length;
+ allocateArrays(columnCount);
+ allocateConvertArrays(columnCount);
+
+ int includedCount = 0;
+ int[] includedIndices = new int[columnCount];
boolean atLeastOneConvert = false;
- for (int i = 0; i < sourceColumnCount; i++) {
+ for (int i = 0; i < columnCount; i++) {
if (columnsToIncludeTruncated != null && !columnsToIncludeTruncated[i]) {
@@ -320,9 +336,17 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
initSourceEntry(i, i, sourceTypeInfo);
}
+
+ includedIndices[includedCount++] = i;
}
}
+ // Optimizing for readField?
+ if (includedCount < columnCount && deserializeRead.isReadFieldSupported()) {
+ useReadField = true;
+ readFieldLogicalIndices = Arrays.copyOf(includedIndices, includedCount);
+ }
+
if (atLeastOneConvert) {
// Let the VectorAssignRow class do the conversion.
@@ -330,8 +354,6 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
convertVectorAssignRow.initConversion(sourceTypeInfos, targetTypeInfos,
columnsToIncludeTruncated);
}
-
- return sourceColumnCount;
}
public void init() throws HiveException {
@@ -339,7 +361,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
}
/**
- * Deserialize one row column value.
+ * Store one row column value that is the current value in deserializeRead.
*
* @param batch
* @param batchIndex
@@ -351,27 +373,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
* in a hash table entry that is immutable.
* @throws IOException
*/
- private void deserializeRowColumn(VectorizedRowBatch batch, int batchIndex,
+ private void storeRowColumn(VectorizedRowBatch batch, int batchIndex,
int logicalColumnIndex, boolean canRetainByteRef) throws IOException {
- Category sourceCategory = sourceCategories[logicalColumnIndex];
- if (sourceCategory == null) {
- /*
- * This is a column that we don't want (i.e. not included).
- * The deserializeRead.readCheckNull() will read the field.
- */
- boolean isNull = deserializeRead.readCheckNull();
- Preconditions.checkState(isNull);
- return;
- }
final int projectionColumnNum = projectionColumnNums[logicalColumnIndex];
- if (deserializeRead.readCheckNull()) {
- VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex);
- return;
- }
-
- // We have a value for the row column.
- switch (sourceCategory) {
+ switch (sourceCategories[logicalColumnIndex]) {
case PRIMITIVE:
{
PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveCategories[logicalColumnIndex];
@@ -546,7 +552,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
}
break;
default:
- throw new RuntimeException("Category " + sourceCategory.name() + " not supported");
+ throw new RuntimeException("Category " + sourceCategories[logicalColumnIndex] + " not supported");
}
// We always set the null flag to false when there is a value.
@@ -554,7 +560,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
}
/**
- * Deserialize and convert one row column value.
+ * Convert one row column value that is the current value in deserializeRead.
*
* We deserialize into a writable and then pass that writable to an instance of VectorAssignRow
* to convert the writable to the target data type and assign it into the VectorizedRowBatch.
@@ -564,32 +570,14 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
* @param logicalColumnIndex
* @throws IOException
*/
- private void deserializeConvertRowColumn(VectorizedRowBatch batch, int batchIndex,
+ private void convertRowColumn(VectorizedRowBatch batch, int batchIndex,
int logicalColumnIndex) throws IOException {
- Category sourceCategory = sourceCategories[logicalColumnIndex];
- if (sourceCategory == null) {
- /*
- * This is a column that we don't want (i.e. not included).
- * The deserializeRead.readCheckNull() will read the field.
- */
- boolean isNull = deserializeRead.readCheckNull();
- Preconditions.checkState(isNull);
- return;
- }
-
final int projectionColumnNum = projectionColumnNums[logicalColumnIndex];
- if (deserializeRead.readCheckNull()) {
- VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex);
- return;
- }
-
- // We have a value for the row column.
Writable convertSourceWritable = convertSourceWritables[logicalColumnIndex];
- switch (sourceCategory) {
+ switch (sourceCategories[logicalColumnIndex]) {
case PRIMITIVE:
{
- PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveCategories[logicalColumnIndex];
- switch (sourcePrimitiveCategory) {
+ switch (sourcePrimitiveCategories[logicalColumnIndex]) {
case VOID:
convertSourceWritable = null;
break;
@@ -702,13 +690,13 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
deserializeRead.currentHiveIntervalDayTimeWritable);
break;
default:
- throw new RuntimeException("Primitive category " + sourcePrimitiveCategory.name() +
+ throw new RuntimeException("Primitive category " + sourcePrimitiveCategories[logicalColumnIndex] +
" not supported");
}
}
break;
default:
- throw new RuntimeException("Category " + sourceCategory.name() + " not supported");
+ throw new RuntimeException("Category " + sourceCategories[logicalColumnIndex] + " not supported");
}
/*
@@ -746,17 +734,51 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
* @throws IOException
*/
public void deserialize(VectorizedRowBatch batch, int batchIndex) throws IOException {
+
+ // Pass false for canRetainByteRef since we will NOT be keeping byte references to the input
+ // bytes with the BytesColumnVector.setRef method.
+
final int count = isConvert.length;
- for (int i = 0; i < count; i++) {
- if (isConvert[i]) {
- deserializeConvertRowColumn(batch, batchIndex, i);
- } else {
- // Pass false for canRetainByteRef since we will NOT be keeping byte references to the input
- // bytes with the BytesColumnVector.setRef method.
- deserializeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ false);
+ if (!useReadField) {
+ for (int i = 0; i < count; i++) {
+ final int projectionColumnNum = projectionColumnNums[i];
+ if (projectionColumnNum == -1) {
+ // We must read through fields we do not want.
+ deserializeRead.skipNextField();
+ continue;
+ }
+ if (!deserializeRead.readNextField()) {
+ ColumnVector colVector = batch.cols[projectionColumnNum];
+ colVector.isNull[batchIndex] = true;
+ colVector.noNulls = false;
+ continue;
+ }
+ // The current* members of deserializeRead have the field value.
+ if (isConvert[i]) {
+ convertRowColumn(batch, batchIndex, i);
+ } else {
+ storeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ false);
+ }
+ }
+ } else {
+ final int readFieldCount = readFieldLogicalIndices.length;
+ for (int i = 0; i < readFieldCount; i++) {
+ final int logicalIndex = readFieldLogicalIndices[i];
+ // Jump to the field we want and read it.
+ if (!deserializeRead.readField(logicalIndex)) {
+ ColumnVector colVector = batch.cols[projectionColumnNums[logicalIndex]];
+ colVector.isNull[batchIndex] = true;
+ colVector.noNulls = false;
+ continue;
+ }
+ // The current* members of deserializeRead have the field value.
+ if (isConvert[logicalIndex]) {
+ convertRowColumn(batch, batchIndex, logicalIndex);
+ } else {
+ storeRowColumn(batch, batchIndex, logicalIndex, /* canRetainByteRef */ false);
+ }
}
}
- deserializeRead.extraFieldsCheck();
}
/**
@@ -781,16 +803,46 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
*/
public void deserializeByRef(VectorizedRowBatch batch, int batchIndex) throws IOException {
final int count = isConvert.length;
- for (int i = 0; i < count; i++) {
- if (isConvert[i]) {
- deserializeConvertRowColumn(batch, batchIndex, i);
- } else {
- // Pass true for canRetainByteRef since we will be keeping byte references to the input
- // bytes with the BytesColumnVector.setRef method.
- deserializeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ true);
+ if (!useReadField) {
+ for (int i = 0; i < count; i++) {
+ final int projectionColumnNum = projectionColumnNums[i];
+ if (projectionColumnNum == -1) {
+ // We must read through fields we do not want.
+ deserializeRead.skipNextField();
+ continue;
+ }
+ if (!deserializeRead.readNextField()) {
+ ColumnVector colVector = batch.cols[projectionColumnNum];
+ colVector.isNull[batchIndex] = true;
+ colVector.noNulls = false;
+ continue;
+ }
+ // The current* members of deserializeRead have the field value.
+ if (isConvert[i]) {
+ convertRowColumn(batch, batchIndex, i);
+ } else {
+ storeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ true);
+ }
+ }
+ } else {
+ final int readFieldCount = readFieldLogicalIndices.length;
+ for (int i = 0; i < readFieldCount; i++) {
+ final int logicalIndex = readFieldLogicalIndices[i];
+ // Jump to the field we want and read it.
+ if (!deserializeRead.readField(logicalIndex)) {
+ ColumnVector colVector = batch.cols[projectionColumnNums[logicalIndex]];
+ colVector.isNull[batchIndex] = true;
+ colVector.noNulls = false;
+ continue;
+ }
+ // The current* members of deserializeRead have the field value.
+ if (isConvert[logicalIndex]) {
+ convertRowColumn(batch, batchIndex, logicalIndex);
+ } else {
+ storeRowColumn(batch, batchIndex, logicalIndex, /* canRetainByteRef */ true);
+ }
}
}
- deserializeRead.extraFieldsCheck();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
index c7fa0db..323419c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
@@ -253,6 +253,20 @@ public class VectorMapOperator extends AbstractMapOperator {
// This type information specifies the data types the partition needs to read.
TypeInfo[] dataTypeInfos = vectorPartDesc.getDataTypeInfos();
+ // We need to provide the minimum number of columns to be read so
+ // LazySimpleDeserializeRead's separator parser does not waste time.
+ //
+ Preconditions.checkState(dataColumnsToIncludeTruncated != null);
+ TypeInfo[] minimalDataTypeInfos;
+ if (dataColumnsToIncludeTruncated.length < dataTypeInfos.length) {
+ minimalDataTypeInfos =
+ Arrays.copyOf(dataTypeInfos, dataColumnsToIncludeTruncated.length);
+ } else {
+ minimalDataTypeInfos = dataTypeInfos;
+ }
+
+ readerColumnCount = minimalDataTypeInfos.length;
+
switch (vectorPartDesc.getVectorDeserializeType()) {
case LAZY_SIMPLE:
{
@@ -262,7 +276,7 @@ public class VectorMapOperator extends AbstractMapOperator {
LazySimpleDeserializeRead lazySimpleDeserializeRead =
new LazySimpleDeserializeRead(
- dataTypeInfos,
+ minimalDataTypeInfos,
/* useExternalBuffer */ true,
simpleSerdeParams);
@@ -270,8 +284,7 @@ public class VectorMapOperator extends AbstractMapOperator {
new VectorDeserializeRow<LazySimpleDeserializeRead>(lazySimpleDeserializeRead);
// Initialize with data row type conversion parameters.
- readerColumnCount =
- vectorDeserializeRow.initConversion(tableRowTypeInfos, dataColumnsToIncludeTruncated);
+ vectorDeserializeRow.initConversion(tableRowTypeInfos, dataColumnsToIncludeTruncated);
deserializeRead = lazySimpleDeserializeRead;
}
@@ -288,8 +301,7 @@ public class VectorMapOperator extends AbstractMapOperator {
new VectorDeserializeRow<LazyBinaryDeserializeRead>(lazyBinaryDeserializeRead);
// Initialize with data row type conversion parameters.
- readerColumnCount =
- vectorDeserializeRow.initConversion(tableRowTypeInfos, dataColumnsToIncludeTruncated);
+ vectorDeserializeRow.initConversion(tableRowTypeInfos, dataColumnsToIncludeTruncated);
deserializeRead = lazyBinaryDeserializeRead;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
index 726a937..bc892ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastLongHashTable.java
@@ -79,7 +79,7 @@ public abstract class VectorMapJoinFastLongHashTable
int keyLength = currentKey.getLength();
keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
try {
- if (keyBinarySortableDeserializeRead.readCheckNull()) {
+ if (!keyBinarySortableDeserializeRead.readNextField()) {
return;
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
index 456e6ba..ab39e58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastStringCommon.java
@@ -46,7 +46,7 @@ public class VectorMapJoinFastStringCommon {
int keyLength = currentKey.getLength();
keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
try {
- if (keyBinarySortableDeserializeRead.readCheckNull()) {
+ if (!keyBinarySortableDeserializeRead.readNextField()) {
return;
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java
index ac85899..6a9039f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedLongCommon.java
@@ -65,62 +65,6 @@ public class VectorMapJoinOptimizedLongCommon {
return max;
}
- /*
- * For now, just use MapJoinBytesTableContainer / HybridHashTableContainer directly.
-
- public void adaptPutRow(VectorMapJoinOptimizedHashTable hashTable,
- BytesWritable currentKey, BytesWritable currentValue)
- throws SerDeException, HiveException, IOException {
-
- if (useMinMax) {
- // Peek at the BinarySortable key to extract the long so we can determine min and max.
- byte[] keyBytes = currentKey.getBytes();
- int keyLength = currentKey.getLength();
- keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
- if (keyBinarySortableDeserializeRead.readCheckNull()) {
- if (isOuterJoin) {
- return;
- } else {
- // For inner join, we expect all NULL values to have been filtered out before now.
- throw new HiveException("Unexpected NULL");
- }
- }
- long key = 0;
- switch (hashTableKeyType) {
- case BOOLEAN:
- key = (keyBinarySortableDeserializeRead.readBoolean() ? 1 : 0);
- break;
- case BYTE:
- key = (long) keyBinarySortableDeserializeRead.readByte();
- break;
- case SHORT:
- key = (long) keyBinarySortableDeserializeRead.readShort();
- break;
- case INT:
- key = (long) keyBinarySortableDeserializeRead.readInt();
- break;
- case LONG:
- key = keyBinarySortableDeserializeRead.readLong();
- break;
- default:
- throw new RuntimeException("Unexpected hash table key type " + hashTableKeyType.name());
- }
- if (key < min) {
- min = key;
- }
- if (key > max) {
- max = key;
- }
-
- // byte[] bytes = Arrays.copyOf(currentKey.get(), currentKey.getLength());
- // LOG.debug("VectorMapJoinOptimizedLongCommon adaptPutRow key " + key + " min " + min + " max " + max + " hashTableKeyType " + hashTableKeyType.name() + " hex " + Hex.encodeHexString(bytes));
-
- }
-
- hashTable.putRowInternal(currentKey, currentValue);
- }
- */
-
public SerializedBytes serialize(long key) throws IOException {
keyBinarySortableSerializeWrite.reset();
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringCommon.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringCommon.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringCommon.java
index 39c2d49..072919b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringCommon.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedStringCommon.java
@@ -45,32 +45,6 @@ public class VectorMapJoinOptimizedStringCommon {
private transient SerializedBytes serializedBytes;
- /*
- private BytesWritable bytesWritable;
-
- public void adaptPutRow(VectorMapJoinOptimizedHashTable hashTable,
- BytesWritable currentKey, BytesWritable currentValue)
- throws SerDeException, HiveException, IOException {
-
- byte[] keyBytes = currentKey.getBytes();
- int keyLength = currentKey.getLength();
- keyBinarySortableDeserializeRead.set(keyBytes, 0, keyLength);
- if (keyBinarySortableDeserializeRead.readCheckNull()) {
- if (isOuterJoin) {
- return;
- } else {
- // For inner join, we expect all NULL values to have been filtered out before now.
- throw new HiveException("Unexpected NULL");
- }
- }
- keyBinarySortableDeserializeRead.readString(readStringResults);
-
- bytesWritable.set(readStringResults.bytes, readStringResults.start, readStringResults.length);
-
- hashTable.putRowInternal(bytesWritable, currentValue);
- }
- */
-
public SerializedBytes serialize(byte[] keyBytes, int keyStart, int keyLength) throws IOException {
keyBinarySortableSerializeWrite.reset();
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index b760988..46bdba6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -29,6 +29,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Set;
import java.util.Stack;
import java.util.regex.Pattern;
@@ -648,11 +649,27 @@ public class Vectorizer implements PhysicalPlanResolver {
if (inputFileFormatClassName.equals(TextInputFormat.class.getName()) &&
deserializerClassName.equals(LazySimpleSerDe.class.getName())) {
- pd.setVectorPartitionDesc(
- VectorPartitionDesc.createVectorDeserialize(
- inputFileFormatClassName, VectorDeserializeType.LAZY_SIMPLE));
+ Properties properties = pd.getTableDesc().getProperties();
+ String lastColumnTakesRestString =
+ properties.getProperty(serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST);
+ boolean lastColumnTakesRest =
+ (lastColumnTakesRestString != null &&
+ lastColumnTakesRestString.equalsIgnoreCase("true"));
+ if (lastColumnTakesRest) {
+
+ // If row mode will not catch this, then inform.
+ if (useRowDeserialize) {
+ LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" +
+ " when " + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST + "is true");
+ return false;
+ }
+ } else {
+ pd.setVectorPartitionDesc(
+ VectorPartitionDesc.createVectorDeserialize(
+ inputFileFormatClassName, VectorDeserializeType.LAZY_SIMPLE));
- return true;
+ return true;
+ }
} else if (inputFileFormatClassName.equals(SequenceFileInputFormat.class.getName()) &&
deserializerClassName.equals(LazyBinarySerDe.class.getName())) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java
index 238c136..8ffff9d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java
@@ -96,7 +96,7 @@ public class TestVectorSerDeRow extends TestCase {
Object expected = expectedRow[i];
PrimitiveCategory primitiveCategory = primitiveCategories[i];
PrimitiveTypeInfo primitiveTypeInfo = source.primitiveTypeInfos()[i];
- if (deserializeRead.readCheckNull()) {
+ if (!deserializeRead.readNextField()) {
throw new HiveException("Unexpected NULL");
}
switch (primitiveCategory) {
@@ -282,9 +282,7 @@ public class TestVectorSerDeRow extends TestCase {
throw new HiveException("Unexpected primitive category " + primitiveCategory);
}
}
- deserializeRead.extraFieldsCheck();
- TestCase.assertTrue(!deserializeRead.readBeyondConfiguredFieldsWarned());
- TestCase.assertTrue(!deserializeRead.bufferRangeHasExtraDataWarned());
+ TestCase.assertTrue(deserializeRead.isEndOfInputReached());
}
void serializeBatch(VectorizedRowBatch batch, VectorSerializeRow vectorSerializeRow,
@@ -382,11 +380,13 @@ public class TestVectorSerDeRow extends TestCase {
Object[] expectedRow = randomRows[firstRandomRowIndex + i];
for (int c = 0; c < rowSize; c++) {
- if (row[c] == null) {
+ Object rowObj = row[c];
+ Object expectedObj = expectedRow[c];
+ if (rowObj == null) {
fail("Unexpected NULL from extractRow");
}
- if (!row[c].equals(expectedRow[c])) {
- fail("Row " + (firstRandomRowIndex + i) + " and column " + c + " mismatch (" + primitiveTypeInfos[c].getPrimitiveCategory() + " actual value " + row[c] + " and expected value " + expectedRow[c] + ")");
+ if (!rowObj.equals(expectedObj)) {
+ fail("Row " + (firstRandomRowIndex + i) + " and column " + c + " mismatch (" + primitiveTypeInfos[c].getPrimitiveCategory() + " actual value " + rowObj + " and expected value " + expectedObj + ")");
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java
index 7f68186..bc7a658 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/CheckFastRowHashMap.java
@@ -79,10 +79,7 @@ public class CheckFastRowHashMap extends CheckFastHashTable {
Writable writable = (Writable) row[index];
VerifyFastRow.verifyDeserializeRead(lazyBinaryDeserializeRead, (PrimitiveTypeInfo) typeInfos[index], writable);
}
- lazyBinaryDeserializeRead.extraFieldsCheck();
- TestCase.assertTrue(!lazyBinaryDeserializeRead.readBeyondConfiguredFieldsWarned());
-
- TestCase.assertTrue(!lazyBinaryDeserializeRead.bufferRangeHasExtraDataWarned());
+ TestCase.assertTrue(lazyBinaryDeserializeRead.isEndOfInputReached());
ref = hashMapResult.next();
if (a == count - 1) {
@@ -171,10 +168,7 @@ public class CheckFastRowHashMap extends CheckFastHashTable {
if (thrown) {
TestCase.fail("Not expecting an exception to be thrown for the non-clipped case...");
}
- lazyBinaryDeserializeRead.extraFieldsCheck();
- TestCase.assertTrue(!lazyBinaryDeserializeRead.readBeyondConfiguredFieldsWarned());
-
- TestCase.assertTrue(!lazyBinaryDeserializeRead.bufferRangeHasExtraDataWarned());
+ TestCase.assertTrue(lazyBinaryDeserializeRead.isEndOfInputReached());
}
ref = hashMapResult.next();
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java
index 118e9e2..239db73 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VerifyFastRow.java
@@ -64,7 +64,7 @@ public class VerifyFastRow {
boolean isNull;
- isNull = deserializeRead.readCheckNull();
+ isNull = !deserializeRead.readNextField();
if (isNull) {
if (writable != null) {
TestCase.fail("Field reports null but object is not null");
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java
index 0cbc8d0..a7785b2 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/fast/BinarySortableDeserializeRead.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
* Directly deserialize with the caller reading field-by-field the LazyBinary serialization format.
*
* The caller is responsible for calling the read method for the right type of each field
- * (after calling readCheckNull).
+ * (after calling readNextField).
*
* Reading some fields require a results object to receive value information. A separate
* results object is created by the caller at initialization per different field even for the same
@@ -53,7 +53,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
// The sort order (ascending/descending) for each field. Set to true when descending (invert).
private boolean[] columnSortOrderIsDesc;
- // Which field we are on. We start with -1 so readCheckNull can increment once and the read
+ // Which field we are on. We start with -1 so readNextField can increment once and the read
// field data methods don't increment.
private int fieldIndex;
@@ -72,9 +72,6 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
private byte[] tempDecimalBuffer;
- private boolean readBeyondConfiguredFieldsWarned;
- private boolean bufferRangeHasExtraDataWarned;
-
private InputByteBuffer inputByteBuffer = new InputByteBuffer();
/*
@@ -96,8 +93,6 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
Arrays.fill(this.columnSortOrderIsDesc, false);
}
inputByteBuffer = new InputByteBuffer();
- readBeyondConfiguredFieldsWarned = false;
- bufferRangeHasExtraDataWarned = false;
internalBufferLen = -1;
}
@@ -151,28 +146,28 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
}
/*
- * Reads the NULL information for a field.
+ * Reads the the next field.
+ *
+ * Afterwards, reading is positioned to the next field.
+ *
+ * @return Return true when the field was not null and data is put in the appropriate
+ * current* member.
+ * Otherwise, false when the field is null.
*
- * @return Returns true when the field is NULL; reading is positioned to the next field.
- * Otherwise, false when the field is NOT NULL; reading is positioned to the field data.
*/
@Override
- public boolean readCheckNull() throws IOException {
+ public boolean readNextField() throws IOException {
// We start with fieldIndex as -1 so we can increment once here and then the read
// field data methods don't increment.
fieldIndex++;
if (fieldIndex >= fieldCount) {
- // Reading beyond the specified field count produces NULL.
- if (!readBeyondConfiguredFieldsWarned) {
- doReadBeyondConfiguredFieldsWarned();
- }
- return true;
+ return false;
}
if (inputByteBuffer.isEof()) {
// Also, reading beyond our byte range produces NULL.
- return true;
+ return false;
}
fieldStart = inputByteBuffer.tell();
@@ -180,20 +175,19 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
byte isNullByte = inputByteBuffer.read(columnSortOrderIsDesc[fieldIndex]);
if (isNullByte == 0) {
- return true;
+ return false;
}
/*
* We have a field and are positioned to it. Read it.
*/
- boolean isNull = false; // Assume.
switch (primitiveCategories[fieldIndex]) {
case BOOLEAN:
currentBoolean = (inputByteBuffer.read(columnSortOrderIsDesc[fieldIndex]) == 2);
- break;
+ return true;
case BYTE:
currentByte = (byte) (inputByteBuffer.read(columnSortOrderIsDesc[fieldIndex]) ^ 0x80);
- break;
+ return true;
case SHORT:
{
final boolean invert = columnSortOrderIsDesc[fieldIndex];
@@ -201,7 +195,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
v = (v << 8) + (inputByteBuffer.read(invert) & 0xff);
currentShort = (short) v;
}
- break;
+ return true;
case INT:
{
final boolean invert = columnSortOrderIsDesc[fieldIndex];
@@ -211,7 +205,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
}
currentInt = v;
}
- break;
+ return true;
case LONG:
{
final boolean invert = columnSortOrderIsDesc[fieldIndex];
@@ -221,7 +215,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
}
currentLong = v;
}
- break;
+ return true;
case DATE:
{
final boolean invert = columnSortOrderIsDesc[fieldIndex];
@@ -231,7 +225,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
}
currentDateWritable.set(v);
}
- break;
+ return true;
case TIMESTAMP:
{
if (tempTimestampBytes == null) {
@@ -243,7 +237,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
}
currentTimestampWritable.setBinarySortable(tempTimestampBytes, 0);
}
- break;
+ return true;
case FLOAT:
{
final boolean invert = columnSortOrderIsDesc[fieldIndex];
@@ -260,7 +254,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
}
currentFloat = Float.intBitsToFloat(v);
}
- break;
+ return true;
case DOUBLE:
{
final boolean invert = columnSortOrderIsDesc[fieldIndex];
@@ -277,7 +271,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
}
currentDouble = Double.longBitsToDouble(v);
}
- break;
+ return true;
case BINARY:
case STRING:
case CHAR:
@@ -333,7 +327,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
}
}
}
- break;
+ return true;
case INTERVAL_YEAR_MONTH:
{
final boolean invert = columnSortOrderIsDesc[fieldIndex];
@@ -343,7 +337,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
}
currentHiveIntervalYearMonthWritable.set(v);
}
- break;
+ return true;
case INTERVAL_DAY_TIME:
{
final boolean invert = columnSortOrderIsDesc[fieldIndex];
@@ -357,7 +351,7 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
}
currentHiveIntervalDayTimeWritable.set(totalSecs, nanos);
}
- break;
+ return true;
case DECIMAL:
{
// Since enforcing precision and scale can cause a HiveDecimal to become NULL,
@@ -428,25 +422,26 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
HiveDecimal decimal = currentHiveDecimalWritable.getHiveDecimal(precision, scale);
if (decimal == null) {
- isNull = true;
- } else {
- // Put value back into writable.
- currentHiveDecimalWritable.set(decimal);
+ return false;
}
+ // Put value back into writable.
+ currentHiveDecimalWritable.set(decimal);
}
- break;
+ return true;
default:
throw new RuntimeException("Unexpected primitive type category " + primitiveCategories[fieldIndex]);
}
+ }
- /*
- * Now that we have read through the field -- did we really want it?
- */
- if (columnsToInclude != null && !columnsToInclude[fieldIndex]) {
- isNull = true;
- }
-
- return isNull;
+ /*
+ * Reads through an undesired field.
+ *
+ * No data values are valid after this call.
+ * Designed for skipping columns that are not included.
+ */
+ public void skipNextField() throws IOException {
+ // Not a known use case for BinarySortable -- so don't optimize.
+ readNextField();
}
@Override
@@ -476,44 +471,17 @@ public final class BinarySortableDeserializeRead extends DeserializeRead {
}
/*
- * Call this method after all fields have been read to check for extra fields.
- */
- public void extraFieldsCheck() {
- if (!inputByteBuffer.isEof()) {
- // We did not consume all of the byte range.
- if (!bufferRangeHasExtraDataWarned) {
- // Warn only once.
- int length = inputByteBuffer.getEnd() - start;
- int remaining = inputByteBuffer.getEnd() - inputByteBuffer.tell();
- LOG.info("Not all fields were read in the buffer range! Buffer range " + start
- + " for length " + length + " but " + remaining + " bytes remain. "
- + "(total buffer length " + inputByteBuffer.getData().length + ")"
- + " Ignoring similar problems.");
- bufferRangeHasExtraDataWarned = true;
- }
- }
- }
-
- /*
- * Read integrity warning flags.
- */
- @Override
- public boolean readBeyondConfiguredFieldsWarned() {
- return readBeyondConfiguredFieldsWarned;
- }
- @Override
- public boolean bufferRangeHasExtraDataWarned() {
- return bufferRangeHasExtraDataWarned;
- }
-
- /*
- * Pull these out of the regular execution path.
+ * Call this method may be called after all the all fields have been read to check
+ * for unread fields.
+ *
+ * Note that when optimizing reading to stop reading unneeded include columns, worrying
+ * about whether all data is consumed is not appropriate (often we aren't reading it all by
+ * design).
+ *
+ * Since LazySimpleDeserializeRead parses the line through the last desired column it does
+ * support this function.
*/
-
- private void doReadBeyondConfiguredFieldsWarned() {
- // Warn only once.
- LOG.info("Reading beyond configured fields! Configured " + fieldCount + " fields but "
- + " reading more (NULLs returned). Ignoring similar problems.");
- readBeyondConfiguredFieldsWarned = true;
+ public boolean isEndOfInputReached() {
+ return inputByteBuffer.isEof();
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java
index 1600fec..ac931d6 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/fast/DeserializeRead.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
* Directly deserialize with the caller reading field-by-field a serialization format.
*
* The caller is responsible for calling the read method for the right type of each field
- * (after calling readCheckNull).
+ * (after calling readNextField).
*
* Reading some fields require a results object to receive value information. A separate
* results object is created by the caller at initialization per different field even for the same
@@ -49,18 +49,16 @@ public abstract class DeserializeRead {
protected boolean useExternalBuffer;
- protected boolean[] columnsToInclude;
-
protected Category[] categories;
protected PrimitiveCategory[] primitiveCategories;
/**
* Constructor.
*
- * When useExternalBuffer is specified true and readCheckNull reads a string/char/varchar/binary
+ * When useExternalBuffer is specified true and readNextField reads a string/char/varchar/binary
* field, it will request an external buffer to receive the data of format conversion.
*
- * if (!deserializeRead.readCheckNull()) {
+ * if (deserializeRead.readNextField()) {
* if (deserializeRead.currentExternalBufferNeeded) {
* <Ensure external buffer is as least deserializeRead.currentExternalBufferNeededLen bytes>
* deserializeRead.copyToExternalBuffer(externalBuffer, externalBufferStart);
@@ -121,8 +119,6 @@ public abstract class DeserializeRead {
this.useExternalBuffer = useExternalBuffer;
}
-
- columnsToInclude = null;
}
// Don't allow for public.
@@ -137,37 +133,62 @@ public abstract class DeserializeRead {
}
/*
- * If some fields are are not going to be used by the query, use this routine to specify
- * the columns to return. The readCheckNull method will automatically return NULL for the
- * other columns.
+ * Set the range of bytes to be deserialized.
*/
- public void setColumnsToInclude(boolean[] columnsToInclude) {
- this.columnsToInclude = columnsToInclude;
- }
+ public abstract void set(byte[] bytes, int offset, int length);
/*
- * Set the range of bytes to be deserialized.
+ * Reads the the next field.
+ *
+ * Afterwards, reading is positioned to the next field.
+ *
+ * @return Return true when the field was not null and data is put in the appropriate
+ * current* member.
+ * Otherwise, false when the field is null.
+ *
*/
- public abstract void set(byte[] bytes, int offset, int length);
+ public abstract boolean readNextField() throws IOException;
/*
- * Reads the NULL information for a field.
+ * Reads through an undesired field.
*
- * @return Return true when the field is NULL; reading is positioned to the next field.
- * Otherwise, false when the field is NOT NULL; reading is positioned to the field data.
+ * No data values are valid after this call.
+ * Designed for skipping columns that are not included.
*/
- public abstract boolean readCheckNull() throws IOException;
+ public abstract void skipNextField() throws IOException;
/*
- * Call this method after all fields have been read to check for extra fields.
+ * Returns true if the readField method is supported;
*/
- public abstract void extraFieldsCheck();
+ public boolean isReadFieldSupported() {
+ return false;
+ }
/*
- * Read integrity warning flags.
+ * When supported, read a field by field number (i.e. random access).
+ *
+ * Currently, only LazySimpleDeserializeRead supports this.
+ *
+ * @return Return true when the field was not null and data is put in the appropriate
+ * current* member.
+ * Otherwise, false when the field is null.
+ */
+ public boolean readField(int fieldIndex) throws IOException {
+ throw new RuntimeException("Not supported");
+ }
+
+ /*
+ * Call this method may be called after all the all fields have been read to check
+ * for unread fields.
+ *
+ * Note that when optimizing reading to stop reading unneeded include columns, worrying
+ * about whether all data is consumed is not appropriate (often we aren't reading it all by
+ * design).
+ *
+ * Since LazySimpleDeserializeRead parses the line through the last desired column it does
+ * support this function.
*/
- public abstract boolean readBeyondConfiguredFieldsWarned();
- public abstract boolean bufferRangeHasExtraDataWarned();
+ public abstract boolean isEndOfInputReached();
/*
* Get detailed read position information to help diagnose exceptions.
@@ -175,7 +196,7 @@ public abstract class DeserializeRead {
public abstract String getDetailedReadPositionString();
/*
- * These members hold the current value that was read when readCheckNull return false.
+ * These members hold the current value that was read when readNextField return false.
*/
/*
http://git-wip-us.apache.org/repos/asf/hive/blob/0b62e6f3/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java
index 07709d8..daf2cfb 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/fast/LazySimpleDeserializeRead.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.serde2.lazy.fast;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.CharacterCodingException;
+import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.util.Arrays;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hive.serde2.lazy.LazyLong;
import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
import org.apache.hadoop.hive.serde2.lazy.LazyShort;
import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Text;
@@ -47,7 +49,7 @@ import org.apache.hive.common.util.TimestampParser;
* serialization format.
*
* The caller is responsible for calling the read method for the right type of each field
- * (after calling readCheckNull).
+ * (after calling readNextField).
*
* Reading some fields require a results object to receive value information. A separate
* results object is created by the caller at initialization per different field even for the same
@@ -62,49 +64,63 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
private int[] startPosition;
- private byte separator;
- private boolean isEscaped;
- private byte escapeChar;
- private byte[] nullSequenceBytes;
- private boolean isExtendedBooleanLiteral;
- private boolean lastColumnTakesRest;
+ private final byte separator;
+ private final boolean isEscaped;
+ private final byte escapeChar;
+ private final int[] escapeCounts;
+ private final byte[] nullSequenceBytes;
+ private final boolean isExtendedBooleanLiteral;
+
+ private final int fieldCount;
private byte[] bytes;
private int start;
- private int offset;
private int end;
- private int fieldCount;
- private int fieldIndex;
- private int parseFieldIndex;
- private int fieldStart;
- private int fieldLength;
+ private boolean parsed;
+
+ // Used by readNextField/skipNextField and not by readField.
+ private int nextFieldIndex;
+ // For getDetailedReadPositionString.
+ private int currentFieldIndex;
+ private int currentFieldStart;
+ private int currentFieldLength;
+
+ // For string/char/varchar buffering when there are escapes.
private int internalBufferLen;
private byte[] internalBuffer;
- private TimestampParser timestampParser;
+ private final TimestampParser timestampParser;
- private boolean extraFieldWarned;
- private boolean missingFieldWarned;
+ private boolean isEndOfInputReached;
public LazySimpleDeserializeRead(TypeInfo[] typeInfos, boolean useExternalBuffer,
byte separator, LazySerDeParameters lazyParams) {
super(typeInfos, useExternalBuffer);
+ fieldCount = typeInfos.length;
+
// Field length is difference between positions hence one extra.
- startPosition = new int[typeInfos.length + 1];
+ startPosition = new int[fieldCount + 1];
this.separator = separator;
isEscaped = lazyParams.isEscaped();
- escapeChar = lazyParams.getEscapeChar();
+ if (isEscaped) {
+ escapeChar = lazyParams.getEscapeChar();
+ escapeCounts = new int[fieldCount];
+ } else {
+ escapeChar = (byte) 0;
+ escapeCounts = null;
+ }
nullSequenceBytes = lazyParams.getNullSequence().getBytes();
isExtendedBooleanLiteral = lazyParams.isExtendedBooleanLiteral();
- lastColumnTakesRest = lazyParams.isLastColumnTakesRest();
+ if (lazyParams.isLastColumnTakesRest()) {
+ throw new RuntimeException("serialization.last.column.takes.rest not supported");
+ }
+
+ timestampParser = new TimestampParser();
- fieldCount = typeInfos.length;
- extraFieldWarned = false;
- missingFieldWarned = false;
internalBufferLen = -1;
}
@@ -113,21 +129,16 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
this(typeInfos, useExternalBuffer, lazyParams.getSeparators()[0], lazyParams);
}
- // Not public since we must have the field count so every 8 fields NULL bytes can be navigated.
- private LazySimpleDeserializeRead() {
- super();
- }
-
/*
* Set the range of bytes to be deserialized.
*/
@Override
public void set(byte[] bytes, int offset, int length) {
this.bytes = bytes;
- this.offset = offset;
start = offset;
end = offset + length;
- fieldIndex = -1;
+ parsed = false;
+ nextFieldIndex = -1;
}
/*
@@ -147,19 +158,16 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
sb.append(" fields with types ");
sb.append(Arrays.toString(typeInfos));
sb.append(". ");
- if (fieldIndex == -1) {
- sb.append("Error during field delimitor parsing of field #");
- sb.append(parseFieldIndex);
+ if (!parsed) {
+ sb.append("Error during field separator parsing");
} else {
sb.append("Read field #");
- sb.append(fieldIndex);
+ sb.append(currentFieldIndex);
sb.append(" at field start position ");
- sb.append(startPosition[fieldIndex]);
- int currentFieldLength = startPosition[fieldIndex + 1] - startPosition[fieldIndex] - 1;
+ sb.append(startPosition[currentFieldIndex]);
+ int currentFieldLength = startPosition[currentFieldIndex + 1] - startPosition[currentFieldIndex] - 1;
sb.append(" for field length ");
sb.append(currentFieldLength);
- sb.append(" current read offset ");
- sb.append(offset);
}
return sb.toString();
@@ -173,395 +181,406 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
*/
private void parse() {
- int structByteEnd = end;
+ int fieldId = 0;
int fieldByteBegin = start;
int fieldByteEnd = start;
- // Kept as a member variable to support getDetailedReadPositionString.
- parseFieldIndex = 0;
+ final byte separator = this.separator;
+ final int fieldCount = this.fieldCount;
+ final int[] startPosition = this.startPosition;
+ final byte[] bytes = this.bytes;
+ final int end = this.end;
- // Go through all bytes in the byte[]
- while (fieldByteEnd <= structByteEnd) {
- if (fieldByteEnd == structByteEnd || bytes[fieldByteEnd] == separator) {
- // Reached the end of a field?
- if (lastColumnTakesRest && parseFieldIndex == fieldCount - 1) {
- fieldByteEnd = structByteEnd;
- }
- startPosition[parseFieldIndex] = fieldByteBegin;
- parseFieldIndex++;
- if (parseFieldIndex == fieldCount || fieldByteEnd == structByteEnd) {
- // All fields have been parsed, or bytes have been parsed.
- // We need to set the startPosition of fields.length to ensure we
- // can use the same formula to calculate the length of each field.
- // For missing fields, their starting positions will all be the same,
- // which will make their lengths to be -1 and uncheckedGetField will
- // return these fields as NULLs.
- for (int i = parseFieldIndex; i <= fieldCount; i++) {
- startPosition[i] = fieldByteEnd + 1;
+ /*
+ * Optimize the loops by pulling special end cases and global decisions like isEscaped out!
+ */
+ if (!isEscaped) {
+ while (fieldByteEnd < end) {
+ if (bytes[fieldByteEnd] == separator) {
+ startPosition[fieldId++] = fieldByteBegin;
+ if (fieldId == fieldCount) {
+ break;
}
- break;
+ fieldByteBegin = ++fieldByteEnd;
+ } else {
+ fieldByteEnd++;
}
- fieldByteBegin = fieldByteEnd + 1;
- fieldByteEnd++;
- } else {
- if (isEscaped && bytes[fieldByteEnd] == escapeChar
- && fieldByteEnd + 1 < structByteEnd) {
- // ignore the char after escape_char
+ }
+ // End serves as final separator.
+ if (fieldByteEnd == end && fieldId < fieldCount) {
+ startPosition[fieldId++] = fieldByteBegin;
+ }
+ } else {
+ final byte escapeChar = this.escapeChar;
+ final int endLessOne = end - 1;
+ final int[] escapeCounts = this.escapeCounts;
+ int escapeCount = 0;
+ // Process the bytes that can be escaped (the last one can't be).
+ while (fieldByteEnd < endLessOne) {
+ if (bytes[fieldByteEnd] == separator) {
+ escapeCounts[fieldId] = escapeCount;
+ escapeCount = 0;
+ startPosition[fieldId++] = fieldByteBegin;
+ if (fieldId == fieldCount) {
+ break;
+ }
+ fieldByteBegin = ++fieldByteEnd;
+ } else if (bytes[fieldByteEnd] == escapeChar) {
+ // Ignore the char after escape_char
fieldByteEnd += 2;
+ escapeCount++;
+ } else {
+ fieldByteEnd++;
+ }
+ }
+ // Process the last byte if necessary.
+ if (fieldByteEnd == endLessOne && fieldId < fieldCount) {
+ if (bytes[fieldByteEnd] == separator) {
+ escapeCounts[fieldId] = escapeCount;
+ escapeCount = 0;
+ startPosition[fieldId++] = fieldByteBegin;
+ if (fieldId <= fieldCount) {
+ fieldByteBegin = ++fieldByteEnd;
+ }
} else {
fieldByteEnd++;
}
}
+ // End serves as final separator.
+ if (fieldByteEnd == end && fieldId < fieldCount) {
+ escapeCounts[fieldId] = escapeCount;
+ startPosition[fieldId++] = fieldByteBegin;
+ }
}
- // Extra bytes at the end?
- if (!extraFieldWarned && fieldByteEnd < structByteEnd) {
- doExtraFieldWarned();
+ if (fieldId == fieldCount || fieldByteEnd == end) {
+ // All fields have been parsed, or bytes have been parsed.
+ // We need to set the startPosition of fields.length to ensure we
+ // can use the same formula to calculate the length of each field.
+ // For missing fields, their starting positions will all be the same,
+ // which will make their lengths to be -1 and uncheckedGetField will
+ // return these fields as NULLs.
+ Arrays.fill(startPosition, fieldId, startPosition.length, fieldByteEnd + 1);
}
- // Missing fields?
- if (!missingFieldWarned && parseFieldIndex < fieldCount) {
- doMissingFieldWarned(parseFieldIndex);
- }
+ isEndOfInputReached = (fieldByteEnd == end);
}
/*
- * Reads the NULL information for a field.
+ * Reads the the next field.
+ *
+ * Afterwards, reading is positioned to the next field.
+ *
+ * @return Return true when the field was not null and data is put in the appropriate
+ * current* member.
+ * Otherwise, false when the field is null.
*
- * @return Returns true when the field is NULL; reading is positioned to the next field.
- * Otherwise, false when the field is NOT NULL; reading is positioned to the field data.
*/
@Override
- public boolean readCheckNull() {
- if (fieldIndex == -1) {
+ public boolean readNextField() throws IOException {
+ if (nextFieldIndex + 1 >= fieldCount) {
+ return false;
+ }
+ nextFieldIndex++;
+ return readField(nextFieldIndex);
+ }
+
+ /*
+ * Reads through an undesired field.
+ *
+ * No data values are valid after this call.
+ * Designed for skipping columns that are not included.
+ */
+ public void skipNextField() throws IOException {
+ if (!parsed) {
parse();
- fieldIndex = 0;
- } else if (fieldIndex + 1 >= fieldCount) {
- return true;
+ parsed = true;
+ }
+ if (nextFieldIndex + 1 >= fieldCount) {
+ // No more.
} else {
- fieldIndex++;
+ nextFieldIndex++;
}
+ }
- // Do we want this field?
- if (columnsToInclude != null && !columnsToInclude[fieldIndex]) {
+ @Override
+ public boolean isReadFieldSupported() {
+ return true;
+ }
+
+ private boolean checkNull(byte[] bytes, int start, int len) {
+ if (len != nullSequenceBytes.length) {
+ return false;
+ }
+ final byte[] nullSequenceBytes = this.nullSequenceBytes;
+ switch(len) {
+ case 0:
+ return true;
+ case 2:
+ return bytes[start] == nullSequenceBytes[0] && bytes[start+1] == nullSequenceBytes[1];
+ case 4:
+ return bytes[start] == nullSequenceBytes[0] && bytes[start+1] == nullSequenceBytes[1]
+ && bytes[start+2] == nullSequenceBytes[2] && bytes[start+3] == nullSequenceBytes[3];
+ default:
+ for (int i = 0; i < nullSequenceBytes.length; i++) {
+ if (bytes[start + i] != nullSequenceBytes[i]) {
+ return false;
+ }
+ }
return true;
}
+ }
- fieldStart = startPosition[fieldIndex];
- fieldLength = startPosition[fieldIndex + 1] - startPosition[fieldIndex] - 1;
+ /*
+ * When supported, read a field by field number (i.e. random access).
+ *
+ * Currently, only LazySimpleDeserializeRead supports this.
+ *
+ * @return Return true when the field was not null and data is put in the appropriate
+ * current* member.
+ * Otherwise, false when the field is null.
+ */
+ public boolean readField(int fieldIndex) throws IOException {
+
+ if (!parsed) {
+ parse();
+ parsed = true;
+ }
+
+ currentFieldIndex = fieldIndex;
+
+ final int fieldStart = startPosition[fieldIndex];
+ currentFieldStart = fieldStart;
+ final int fieldLength = startPosition[fieldIndex + 1] - startPosition[fieldIndex] - 1;
+ currentFieldLength = fieldLength;
if (fieldLength < 0) {
- return true;
+ return false;
}
+ final byte[] bytes = this.bytes;
+
// Is the field the configured string representing NULL?
if (nullSequenceBytes != null) {
- if (fieldLength == nullSequenceBytes.length) {
- int i = 0;
- while (true) {
- if (bytes[fieldStart + i] != nullSequenceBytes[i]) {
- break;
- }
- i++;
- if (i >= fieldLength) {
- return true;
- }
- }
+ if (checkNull(bytes, fieldStart, fieldLength)) {
+ return false;
}
}
- /*
- * We have a field and are positioned to it. Read it.
- */
- switch (primitiveCategories[fieldIndex]) {
- case BOOLEAN:
- {
- int i = fieldStart;
- if (fieldLength == 4) {
- if ((bytes[i] == 'T' || bytes[i] == 't') &&
- (bytes[i + 1] == 'R' || bytes[i + 1] == 'r') &&
- (bytes[i + 2] == 'U' || bytes[i + 1] == 'u') &&
- (bytes[i + 3] == 'E' || bytes[i + 3] == 'e')) {
- currentBoolean = true;
- } else {
- // No boolean value match for 5 char field.
- return true;
- }
- } else if (fieldLength == 5) {
- if ((bytes[i] == 'F' || bytes[i] == 'f') &&
- (bytes[i + 1] == 'A' || bytes[i + 1] == 'a') &&
- (bytes[i + 2] == 'L' || bytes[i + 2] == 'l') &&
- (bytes[i + 3] == 'S' || bytes[i + 3] == 's') &&
- (bytes[i + 4] == 'E' || bytes[i + 4] == 'e')) {
- currentBoolean = false;
- } else {
- // No boolean value match for 4 char field.
- return true;
- }
- } else if (isExtendedBooleanLiteral && fieldLength == 1) {
- byte b = bytes[fieldStart];
- if (b == '1' || b == 't' || b == 'T') {
- currentBoolean = true;
- } else if (b == '0' || b == 'f' || b == 'F') {
- currentBoolean = false;
+ try {
+ /*
+ * We have a field and are positioned to it. Read it.
+ */
+ switch (primitiveCategories[fieldIndex]) {
+ case BOOLEAN:
+ {
+ int i = fieldStart;
+ if (fieldLength == 4) {
+ if ((bytes[i] == 'T' || bytes[i] == 't') &&
+ (bytes[i + 1] == 'R' || bytes[i + 1] == 'r') &&
+ (bytes[i + 2] == 'U' || bytes[i + 1] == 'u') &&
+ (bytes[i + 3] == 'E' || bytes[i + 3] == 'e')) {
+ currentBoolean = true;
+ } else {
+ // No boolean value match for 4 char field.
+ return false;
+ }
+ } else if (fieldLength == 5) {
+ if ((bytes[i] == 'F' || bytes[i] == 'f') &&
+ (bytes[i + 1] == 'A' || bytes[i + 1] == 'a') &&
+ (bytes[i + 2] == 'L' || bytes[i + 2] == 'l') &&
+ (bytes[i + 3] == 'S' || bytes[i + 3] == 's') &&
+ (bytes[i + 4] == 'E' || bytes[i + 4] == 'e')) {
+ currentBoolean = false;
+ } else {
+ // No boolean value match for 5 char field.
+ return false;
+ }
+ } else if (isExtendedBooleanLiteral && fieldLength == 1) {
+ byte b = bytes[fieldStart];
+ if (b == '1' || b == 't' || b == 'T') {
+ currentBoolean = true;
+ } else if (b == '0' || b == 'f' || b == 'F') {
+ currentBoolean = false;
+ } else {
+ // No boolean value match for extended 1 char field.
+ return false;
+ }
} else {
- // No boolean value match for extended 1 char field.
- return true;
+ // No boolean value match for other lengths.
+ return false;
}
- } else {
- // No boolean value match for other lengths.
- return true;
}
- }
- break;
- case BYTE:
- if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
return true;
- }
- try {
+ case BYTE:
+ if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+ return false;
+ }
currentByte = LazyByte.parseByte(bytes, fieldStart, fieldLength, 10);
- } catch (NumberFormatException e) {
- logExceptionMessage(bytes, fieldStart, fieldLength, "TINYINT");
- return true;
- }
- break;
- case SHORT:
- if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
return true;
- }
- try {
+ case SHORT:
+ if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+ return false;
+ }
currentShort = LazyShort.parseShort(bytes, fieldStart, fieldLength, 10);
- } catch (NumberFormatException e) {
- logExceptionMessage(bytes, fieldStart, fieldLength, "SMALLINT");
- return true;
- }
- break;
- case INT:
- if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
return true;
- }
- try {
+ case INT:
+ if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+ return false;
+ }
currentInt = LazyInteger.parseInt(bytes, fieldStart, fieldLength, 10);
- } catch (NumberFormatException e) {
- logExceptionMessage(bytes, fieldStart, fieldLength, "INT");
- return true;
- }
- break;
- case LONG:
- if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
return true;
- }
- try {
+ case LONG:
+ if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+ return false;
+ }
currentLong = LazyLong.parseLong(bytes, fieldStart, fieldLength, 10);
- } catch (NumberFormatException e) {
- logExceptionMessage(bytes, fieldStart, fieldLength, "BIGINT");
return true;
- }
- break;
- case FLOAT:
- {
+ case FLOAT:
if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
- return true;
- }
- String byteData = null;
- try {
- byteData = Text.decode(bytes, fieldStart, fieldLength);
- currentFloat = Float.parseFloat(byteData);
- } catch (NumberFormatException e) {
- LOG.debug("Data not in the Float data type range so converted to null. Given data is :"
- + byteData, e);
- return true;
- } catch (CharacterCodingException e) {
- LOG.debug("Data not in the Float data type range so converted to null.", e);
- return true;
+ return false;
}
- }
- break;
- case DOUBLE:
- {
+ currentFloat =
+ Float.parseFloat(
+ new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8));
+ return true;
+ case DOUBLE:
if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
- return true;
+ return false;
}
- String byteData = null;
- try {
- byteData = Text.decode(bytes, fieldStart, fieldLength);
- currentDouble = Double.parseDouble(byteData);
- } catch (NumberFormatException e) {
- LOG.debug("Data not in the Double data type range so converted to null. Given data is :"
- + byteData, e);
- return true;
- } catch (CharacterCodingException e) {
- LOG.debug("Data not in the Double data type range so converted to null.", e);
- return true;
- }
- }
- break;
-
- case STRING:
- case CHAR:
- case VARCHAR:
- {
- if (isEscaped) {
- // First calculate the length of the output string
- int outputLength = 0;
- for (int i = 0; i < fieldLength; i++) {
- if (bytes[fieldStart + i] != escapeChar) {
- outputLength++;
+ currentDouble =
+ Double.parseDouble(
+ new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8));
+ return true;
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ {
+ if (isEscaped) {
+ if (escapeCounts[fieldIndex] == 0) {
+ // No escaping.
+ currentExternalBufferNeeded = false;
+ currentBytes = bytes;
+ currentBytesStart = fieldStart;
+ currentBytesLength = fieldLength;
} else {
- outputLength++;
- i++;
+ final int unescapedLength = fieldLength - escapeCounts[fieldIndex];
+ if (useExternalBuffer) {
+ currentExternalBufferNeeded = true;
+ currentExternalBufferNeededLen = unescapedLength;
+ } else {
+ // The copyToBuffer will reposition and re-read the input buffer.
+ currentExternalBufferNeeded = false;
+ if (internalBufferLen < unescapedLength) {
+ internalBufferLen = unescapedLength;
+ internalBuffer = new byte[internalBufferLen];
+ }
+ copyToBuffer(internalBuffer, 0, unescapedLength);
+ currentBytes = internalBuffer;
+ currentBytesStart = 0;
+ currentBytesLength = unescapedLength;
+ }
}
- }
- if (outputLength == fieldLength) {
- // No escaping.
+ } else {
+ // If the data is not escaped, reference the data directly.
currentExternalBufferNeeded = false;
currentBytes = bytes;
currentBytesStart = fieldStart;
- currentBytesLength = outputLength;
- } else {
- if (useExternalBuffer) {
- currentExternalBufferNeeded = true;
- currentExternalBufferNeededLen = outputLength;
- } else {
- // The copyToBuffer will reposition and re-read the input buffer.
- currentExternalBufferNeeded = false;
- if (internalBufferLen < outputLength) {
- internalBufferLen = outputLength;
- internalBuffer = new byte[internalBufferLen];
- }
- copyToBuffer(internalBuffer, 0, outputLength);
- currentBytes = internalBuffer;
- currentBytesStart = 0;
- currentBytesLength = outputLength;
- }
+ currentBytesLength = fieldLength;
}
- } else {
- // If the data is not escaped, reference the data directly.
- currentExternalBufferNeeded = false;
- currentBytes = bytes;
- currentBytesStart = fieldStart;
- currentBytesLength = fieldLength;
}
- }
- break;
- case BINARY:
- {
- byte[] recv = new byte[fieldLength];
- System.arraycopy(bytes, fieldStart, recv, 0, fieldLength);
- byte[] decoded = LazyBinary.decodeIfNeeded(recv);
- // use the original bytes in case decoding should fail
- decoded = decoded.length > 0 ? decoded : recv;
- currentBytes = decoded;
- currentBytesStart = 0;
- currentBytesLength = decoded.length;
- }
- break;
- case DATE:
- {
- if (!LazyUtils.isDateMaybe(bytes, fieldStart, fieldLength)) {
- return true;
- }
- String s = null;
- try {
- s = Text.decode(bytes, fieldStart, fieldLength);
- currentDateWritable.set(Date.valueOf(s));
- } catch (Exception e) {
- logExceptionMessage(bytes, fieldStart, fieldLength, "DATE");
- return true;
+ return true;
+ case BINARY:
+ {
+ byte[] recv = new byte[fieldLength];
+ System.arraycopy(bytes, fieldStart, recv, 0, fieldLength);
+ byte[] decoded = LazyBinary.decodeIfNeeded(recv);
+ // use the original bytes in case decoding should fail
+ decoded = decoded.length > 0 ? decoded : recv;
+ currentBytes = decoded;
+ currentBytesStart = 0;
+ currentBytesLength = decoded.length;
}
- }
- break;
- case TIMESTAMP:
- {
+ return true;
+ case DATE:
if (!LazyUtils.isDateMaybe(bytes, fieldStart, fieldLength)) {
- return true;
+ return false;
}
- String s = null;
- try {
- s = new String(bytes, fieldStart, fieldLength, "US-ASCII");
- } catch (UnsupportedEncodingException e) {
- LOG.error("Unsupported encoding found ", e);
- s = "";
- }
-
- if (s.compareTo("NULL") == 0) {
- logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP");
- return true;
- } else {
+ currentDateWritable.set(
+ Date.valueOf(
+ new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8)));
+ return true;
+ case TIMESTAMP:
+ {
+ if (!LazyUtils.isDateMaybe(bytes, fieldStart, fieldLength)) {
+ return false;
+ }
+ String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.US_ASCII);
+ if (s.compareTo("NULL") == 0) {
+ logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP");
+ return false;
+ }
try {
- if (timestampParser == null) {
- timestampParser = new TimestampParser();
- }
currentTimestampWritable.set(timestampParser.parseTimestamp(s));
} catch (IllegalArgumentException e) {
logExceptionMessage(bytes, fieldStart, fieldLength, "TIMESTAMP");
- return true;
+ return false;
}
}
- }
- break;
- case INTERVAL_YEAR_MONTH:
- {
+ return true;
+ case INTERVAL_YEAR_MONTH:
if (fieldLength == 0) {
- return true;
+ return false;
}
- String s = null;
try {
- s = Text.decode(bytes, fieldStart, fieldLength);
+ String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8);
currentHiveIntervalYearMonthWritable.set(HiveIntervalYearMonth.valueOf(s));
} catch (Exception e) {
logExceptionMessage(bytes, fieldStart, fieldLength, "INTERVAL_YEAR_MONTH");
- return true;
+ return false;
}
- }
- break;
- case INTERVAL_DAY_TIME:
- {
+ return true;
+ case INTERVAL_DAY_TIME:
if (fieldLength == 0) {
- return true;
+ return false;
}
- String s = null;
try {
- s = Text.decode(bytes, fieldStart, fieldLength);
+ String s = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8);
currentHiveIntervalDayTimeWritable.set(HiveIntervalDayTime.valueOf(s));
} catch (Exception e) {
logExceptionMessage(bytes, fieldStart, fieldLength, "INTERVAL_DAY_TIME");
- return true;
+ return false;
}
- }
- break;
- case DECIMAL:
- {
- if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
- return true;
- }
- String byteData = null;
- try {
- byteData = Text.decode(bytes, fieldStart, fieldLength);
- } catch (CharacterCodingException e) {
- LOG.debug("Data not in the HiveDecimal data type range so converted to null.", e);
- return true;
+ return true;
+ case DECIMAL:
+ {
+ if (!LazyUtils.isNumberMaybe(bytes, fieldStart, fieldLength)) {
+ return false;
+ }
+ String byteData = new String(bytes, fieldStart, fieldLength, StandardCharsets.UTF_8);
+ HiveDecimal decimal = HiveDecimal.create(byteData);
+ DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfos[fieldIndex];
+ int precision = decimalTypeInfo.getPrecision();
+ int scale = decimalTypeInfo.getScale();
+ decimal = HiveDecimal.enforcePrecisionScale(decimal, precision, scale);
+ if (decimal == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Data not in the HiveDecimal data type range so converted to null. Given data is :"
+ + byteData);
+ }
+ return false;
+ }
+ currentHiveDecimalWritable.set(decimal);
}
+ return true;
- HiveDecimal decimal = HiveDecimal.create(byteData);
- DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfos[fieldIndex];
- int precision = decimalTypeInfo.getPrecision();
- int scale = decimalTypeInfo.getScale();
- decimal = HiveDecimal.enforcePrecisionScale(
- decimal, precision, scale);
- if (decimal == null) {
- LOG.debug("Data not in the HiveDecimal data type range so converted to null. Given data is :"
- + byteData);
- return true;
- }
- currentHiveDecimalWritable.set(decimal);
+ default:
+ throw new Error("Unexpected primitive category " + primitiveCategories[fieldIndex].name());
}
- break;
-
- default:
- throw new Error("Unexpected primitive category " + primitiveCategories[fieldIndex].name());
+ } catch (NumberFormatException nfe) {
+ // U+FFFD will throw this as well
+ logExceptionMessage(bytes, fieldStart, fieldLength, primitiveCategories[fieldIndex]);
+ return false;
}
-
- return false;
}
@Override
@@ -570,6 +589,8 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
}
private void copyToBuffer(byte[] buffer, int bufferStart, int bufferLength) {
+
+ final int fieldStart = currentFieldStart;
int k = 0;
for (int i = 0; i < bufferLength; i++) {
byte b = bytes[fieldStart + i];
@@ -590,9 +611,44 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
}
}
+ /*
+ * Call this method may be called after all the all fields have been read to check
+ * for unread fields.
+ *
+ * Note that when optimizing reading to stop reading unneeded include columns, worrying
+ * about whether all data is consumed is not appropriate (often we aren't reading it all by
+ * design).
+ *
+ * Since LazySimpleDeserializeRead parses the line through the last desired column it does
+ * support this function.
+ */
+ public boolean isEndOfInputReached() {
+ return isEndOfInputReached;
+ }
+
+ public void logExceptionMessage(byte[] bytes, int bytesStart, int bytesLength,
+ PrimitiveCategory dataCategory) {
+ final String dataType;
+ switch (dataCategory) {
+ case BYTE:
+ dataType = "TINYINT";
+ break;
+ case LONG:
+ dataType = "BIGINT";
+ break;
+ case SHORT:
+ dataType = "SMALLINT";
+ break;
+ default:
+ dataType = dataCategory.toString();
+ break;
+ }
+ logExceptionMessage(bytes, bytesStart, bytesLength, dataType);
+ }
+
public void logExceptionMessage(byte[] bytes, int bytesStart, int bytesLength, String dataType) {
try {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
String byteData = Text.decode(bytes, bytesStart, bytesLength);
LOG.debug("Data not in the " + dataType
+ " data type range so converted to null. Given data is :" +
@@ -603,38 +659,6 @@ public final class LazySimpleDeserializeRead extends DeserializeRead {
}
}
- /*
- * Call this method after all fields have been read to check for extra fields.
- */
- @Override
- public void extraFieldsCheck() {
- // UNDONE: Get rid of...
- }
-
- /*
- * Read integrity warning flags.
- */
- @Override
- public boolean readBeyondConfiguredFieldsWarned() {
- return missingFieldWarned;
- }
- @Override
- public boolean bufferRangeHasExtraDataWarned() {
- return false;
- }
-
- private void doExtraFieldWarned() {
- extraFieldWarned = true;
- LOG.warn("Extra bytes detected at the end of the row! Ignoring similar "
- + "problems.");
- }
-
- private void doMissingFieldWarned(int fieldId) {
- missingFieldWarned = true;
- LOG.info("Missing fields! Expected " + fieldCount + " fields but "
- + "only got " + fieldId + "! Ignoring similar problems.");
- }
-
//------------------------------------------------------------------------------------------------
private static byte[] maxLongBytes = ((Long) Long.MAX_VALUE).toString().getBytes();