You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/01/12 18:56:48 UTC
[17/18] hive git commit: HIVE-12625: Backport to branch-1 HIVE-11981
ORC Schema Evolution Issues (Vectorized, ACID,
and Non-Vectorized) (Matt McCline,
reviewed by Prasanth J) HIVE-12728: Apply DDL restrictions for ORC schema
evolution (Prasanth Jayachan
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
index 525b3c5..41dc3e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java
@@ -17,12 +17,9 @@
*/
package org.apache.hadoop.hive.ql.exec.vector;
+import java.io.IOException;
import java.util.Arrays;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
/**
* This class represents a nullable double precision floating point column vector.
* This class will be used for operations on all floating point types (float, double)
@@ -36,7 +33,6 @@ import org.apache.hadoop.io.Writable;
*/
public class DoubleColumnVector extends ColumnVector {
public double[] vector;
- private final DoubleWritable writableObj = new DoubleWritable();
public static final double NULL_VALUE = Double.NaN;
/**
@@ -57,19 +53,6 @@ public class DoubleColumnVector extends ColumnVector {
vector = new double[len];
}
- @Override
- public Writable getWritableObject(int index) {
- if (this.isRepeating) {
- index = 0;
- }
- if (!noNulls && isNull[index]) {
- return NullWritable.get();
- } else {
- writableObj.set(vector[index]);
- return writableObj;
- }
- }
-
// Copy the current object contents into the output. Only copy selected entries,
// as indicated by selectedInUse and the sel array.
public void copySelected(
@@ -121,6 +104,14 @@ public class DoubleColumnVector extends ColumnVector {
vector[0] = value;
}
+ // Fill the column vector with nulls
+ public void fillWithNulls() {
+ noNulls = false;
+ isRepeating = true;
+ vector[0] = NULL_VALUE;
+ isNull[0] = true;
+ }
+
// Simplify vector by brute-force flattening noNulls and isRepeating
// This can be used to reduce combinatorial explosion of code paths in VectorExpressions
// with many arguments.
@@ -144,6 +135,44 @@ public class DoubleColumnVector extends ColumnVector {
@Override
public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
- vector[outElementNum] = ((DoubleColumnVector) inputVector).vector[inputElementNum];
+ if (inputVector.isRepeating) {
+ inputElementNum = 0;
+ }
+ if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) {
+ isNull[outElementNum] = false;
+ vector[outElementNum] =
+ ((DoubleColumnVector) inputVector).vector[inputElementNum];
+ } else {
+ isNull[outElementNum] = true;
+ noNulls = false;
+ }
+ }
+
+ @Override
+ public void stringifyValue(StringBuilder buffer, int row) {
+ if (isRepeating) {
+ row = 0;
+ }
+ if (noNulls || !isNull[row]) {
+ buffer.append(vector[row]);
+ } else {
+ buffer.append("null");
+ }
+ }
+
+ @Override
+ public void ensureSize(int size, boolean preserveData) {
+ if (size > vector.length) {
+ super.ensureSize(size, preserveData);
+ double[] oldArray = vector;
+ vector = new double[size];
+ if (preserveData) {
+ if (isRepeating) {
+ vector[0] = oldArray[0];
+ } else {
+ System.arraycopy(oldArray, 0, vector, 0 , oldArray.length);
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
index f0545fe..0afe5db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java
@@ -17,12 +17,9 @@
*/
package org.apache.hadoop.hive.ql.exec.vector;
+import java.io.IOException;
import java.util.Arrays;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-
/**
* This class represents a nullable int column vector.
* This class will be used for operations on all integer types (tinyint, smallint, int, bigint)
@@ -36,7 +33,6 @@ import org.apache.hadoop.io.Writable;
*/
public class LongColumnVector extends ColumnVector {
public long[] vector;
- private final LongWritable writableObj = new LongWritable();
public static final long NULL_VALUE = 1;
/**
@@ -50,26 +46,13 @@ public class LongColumnVector extends ColumnVector {
/**
* Don't use this except for testing purposes.
*
- * @param len
+ * @param len the number of rows
*/
public LongColumnVector(int len) {
super(len);
vector = new long[len];
}
- @Override
- public Writable getWritableObject(int index) {
- if (this.isRepeating) {
- index = 0;
- }
- if (!noNulls && isNull[index]) {
- return NullWritable.get();
- } else {
- writableObj.set(vector[index]);
- return writableObj;
- }
- }
-
// Copy the current object contents into the output. Only copy selected entries,
// as indicated by selectedInUse and the sel array.
public void copySelected(
@@ -141,7 +124,9 @@ public class LongColumnVector extends ColumnVector {
}
}
else {
- System.arraycopy(vector, 0, output.vector, 0, size);
+ for(int i = 0; i < size; ++i) {
+ output.vector[i] = vector[i];
+ }
}
// Copy nulls over if needed
@@ -165,6 +150,14 @@ public class LongColumnVector extends ColumnVector {
vector[0] = value;
}
+ // Fill the column vector with nulls
+ public void fillWithNulls() {
+ noNulls = false;
+ isRepeating = true;
+ vector[0] = NULL_VALUE;
+ isNull[0] = true;
+ }
+
// Simplify vector by brute-force flattening noNulls and isRepeating
// This can be used to reduce combinatorial explosion of code paths in VectorExpressions
// with many arguments.
@@ -188,6 +181,44 @@ public class LongColumnVector extends ColumnVector {
@Override
public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) {
- vector[outElementNum] = ((LongColumnVector) inputVector).vector[inputElementNum];
+ if (inputVector.isRepeating) {
+ inputElementNum = 0;
+ }
+ if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) {
+ isNull[outElementNum] = false;
+ vector[outElementNum] =
+ ((LongColumnVector) inputVector).vector[inputElementNum];
+ } else {
+ isNull[outElementNum] = true;
+ noNulls = false;
+ }
+ }
+
+ @Override
+ public void stringifyValue(StringBuilder buffer, int row) {
+ if (isRepeating) {
+ row = 0;
+ }
+ if (noNulls || !isNull[row]) {
+ buffer.append(vector[row]);
+ } else {
+ buffer.append("null");
+ }
+ }
+
+ @Override
+ public void ensureSize(int size, boolean preserveData) {
+ if (size > vector.length) {
+ super.ensureSize(size, preserveData);
+ long[] oldArray = vector;
+ vector = new long[size];
+ if (preserveData) {
+ if (isRepeating) {
+ vector[0] = oldArray[0];
+ } else {
+ System.arraycopy(oldArray, 0, vector, 0 , oldArray.length);
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
index 8452abd..56cf9ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
@@ -31,15 +31,10 @@ import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.fast.DeserializeRead;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hive.common.util.DateUtils;
@@ -61,12 +56,12 @@ public class VectorDeserializeRow {
private Reader[] readersByValue;
private Reader[] readersByReference;
- private PrimitiveTypeInfo[] primitiveTypeInfos;
+ private TypeInfo[] typeInfos;
public VectorDeserializeRow(DeserializeRead deserializeRead) {
this();
this.deserializeRead = deserializeRead;
- primitiveTypeInfos = deserializeRead.primitiveTypeInfos();
+ typeInfos = deserializeRead.typeInfos();
}
@@ -564,7 +559,7 @@ public class VectorDeserializeRow {
Reader readerByValue = null;
Reader readerByReference = null;
- PrimitiveTypeInfo primitiveTypeInfo = primitiveTypeInfos[index];
+ PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfos[index];
PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
switch (primitiveCategory) {
// case VOID:
@@ -642,10 +637,10 @@ public class VectorDeserializeRow {
public void init(int[] outputColumns) throws HiveException {
- readersByValue = new Reader[primitiveTypeInfos.length];
- readersByReference = new Reader[primitiveTypeInfos.length];
+ readersByValue = new Reader[typeInfos.length];
+ readersByReference = new Reader[typeInfos.length];
- for (int i = 0; i < primitiveTypeInfos.length; i++) {
+ for (int i = 0; i < typeInfos.length; i++) {
int outputColumn = outputColumns[i];
addReader(i, outputColumn);
}
@@ -653,10 +648,10 @@ public class VectorDeserializeRow {
public void init(List<Integer> outputColumns) throws HiveException {
- readersByValue = new Reader[primitiveTypeInfos.length];
- readersByReference = new Reader[primitiveTypeInfos.length];
+ readersByValue = new Reader[typeInfos.length];
+ readersByReference = new Reader[typeInfos.length];
- for (int i = 0; i < primitiveTypeInfos.length; i++) {
+ for (int i = 0; i < typeInfos.length; i++) {
int outputColumn = outputColumns.get(i);
addReader(i, outputColumn);
}
@@ -664,10 +659,10 @@ public class VectorDeserializeRow {
public void init(int startColumn) throws HiveException {
- readersByValue = new Reader[primitiveTypeInfos.length];
- readersByReference = new Reader[primitiveTypeInfos.length];
+ readersByValue = new Reader[typeInfos.length];
+ readersByReference = new Reader[typeInfos.length];
- for (int i = 0; i < primitiveTypeInfos.length; i++) {
+ for (int i = 0; i < typeInfos.length; i++) {
int outputColumn = startColumn + i;
addReader(i, outputColumn);
}
@@ -709,14 +704,14 @@ public class VectorDeserializeRow {
private void throwMoreDetailedException(IOException e, int index) throws EOFException {
StringBuilder sb = new StringBuilder();
- sb.append("Detail: \"" + e.toString() + "\" occured for field " + index + " of " + primitiveTypeInfos.length + " fields (");
- for (int i = 0; i < primitiveTypeInfos.length; i++) {
+ sb.append("Detail: \"" + e.toString() + "\" occured for field " + index + " of " + typeInfos.length + " fields (");
+ for (int i = 0; i < typeInfos.length; i++) {
if (i > 0) {
sb.append(", ");
}
- sb.append(primitiveTypeInfos[i].getPrimitiveCategory().name());
+ sb.append(((PrimitiveTypeInfo) typeInfos[i]).getPrimitiveCategory().name());
}
sb.append(")");
throw new EOFException(sb.toString());
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
index ee6939d..9774f0c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
@@ -468,6 +468,9 @@ public abstract class VectorExtractRow {
int start = colVector.start[adjustedIndex];
int length = colVector.length[adjustedIndex];
+ if (value == null) {
+ LOG.info("null string entry: batchIndex " + batchIndex + " columnIndex " + columnIndex);
+ }
// Use org.apache.hadoop.io.Text as our helper to go from byte[] to String.
text.set(value, start, length);
@@ -727,9 +730,9 @@ public abstract class VectorExtractRow {
}
public void extractRow(int batchIndex, Object[] objects) {
- int i = 0;
- for (Extractor extracter : extracters) {
- objects[i++] = extracter.extract(batchIndex);
+ for (int i = 0; i < extracters.length; i++) {
+ Extractor extracter = extracters[i];
+ objects[i] = extracter.extract(batchIndex);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
index 39a83e3..fa66964 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
@@ -814,7 +814,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements
outputFieldNames, objectInspectors);
if (isVectorOutput) {
vrbCtx = new VectorizedRowBatchCtx();
- vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) outputObjInspector);
+ vrbCtx.init((StructObjectInspector) outputObjInspector, vOutContext.getScratchColumnTypeNames());
outputBatch = vrbCtx.createVectorizedRowBatch();
vectorAssignRowSameBatch = new VectorAssignRowSameBatch();
vectorAssignRowSameBatch.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns());
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
index 0baec2c..9920e9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java
@@ -91,7 +91,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
Collection<Future<?>> result = super.initializeOp(hconf);
vrbCtx = new VectorizedRowBatchCtx();
- vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector);
+ vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames());
outputBatch = vrbCtx.createVectorizedRowBatch();
@@ -182,4 +182,4 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector
public VectorizationContext getOuputVectorizationContext() {
return vOutContext;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
index 804ba17..66190ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java
@@ -146,7 +146,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect
Collection<Future<?>> result = super.initializeOp(hconf);
vrbCtx = new VectorizedRowBatchCtx();
- vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector);
+ vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames());
outputBatch = vrbCtx.createVectorizedRowBatch();
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
index 342bf67..5586944 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
@@ -22,8 +22,6 @@ import java.io.IOException;
import java.sql.Timestamp;
import java.util.List;
-import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
-import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -604,13 +602,13 @@ public class VectorSerializeRow {
}
}
- public void init(PrimitiveTypeInfo[] primitiveTypeInfos, List<Integer> columnMap)
+ public void init(TypeInfo[] typeInfos, int[] columnMap)
throws HiveException {
- writers = new Writer[primitiveTypeInfos.length];
- for (int i = 0; i < primitiveTypeInfos.length; i++) {
- int columnIndex = columnMap.get(i);
- Writer writer = createWriter(primitiveTypeInfos[i], columnIndex);
+ writers = new Writer[typeInfos.length];
+ for (int i = 0; i < typeInfos.length; i++) {
+ int columnIndex = columnMap[i];
+ Writer writer = createWriter(typeInfos[i], columnIndex);
writers[i] = writer;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index da89e38..ea03099 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -145,6 +145,8 @@ public class VectorizationContext {
VectorExpressionDescriptor vMap;
+ private List<String> initialColumnNames;
+
private List<Integer> projectedColumns;
private List<String> projectionColumnNames;
private Map<String, Integer> projectionColumnMap;
@@ -158,7 +160,11 @@ public class VectorizationContext {
public VectorizationContext(String contextName, List<String> initialColumnNames) {
this.contextName = contextName;
level = 0;
- LOG.info("VectorizationContext consructor contextName " + contextName + " level " + level + " initialColumnNames " + initialColumnNames.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("VectorizationContext consructor contextName " + contextName + " level "
+ + level + " initialColumnNames " + initialColumnNames);
+ }
+ this.initialColumnNames = initialColumnNames;
this.projectionColumnNames = initialColumnNames;
projectedColumns = new ArrayList<Integer>();
@@ -178,8 +184,11 @@ public class VectorizationContext {
public VectorizationContext(String contextName) {
this.contextName = contextName;
level = 0;
- LOG.info("VectorizationContext consructor contextName " + contextName + " level " + level);
- projectedColumns = new ArrayList<Integer>();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("VectorizationContext consructor contextName " + contextName + " level " + level);
+ }
+ initialColumnNames = new ArrayList<String>();
+ projectedColumns = new ArrayList<Integer>();
projectionColumnNames = new ArrayList<String>();
projectionColumnMap = new HashMap<String, Integer>();
this.ocm = new OutputColumnManager(0);
@@ -194,6 +203,7 @@ public class VectorizationContext {
this.contextName = contextName;
level = vContext.level + 1;
LOG.info("VectorizationContext consructor reference contextName " + contextName + " level " + level);
+ this.initialColumnNames = vContext.initialColumnNames;
this.projectedColumns = new ArrayList<Integer>();
this.projectionColumnNames = new ArrayList<String>();
this.projectionColumnMap = new HashMap<String, Integer>();
@@ -206,6 +216,7 @@ public class VectorizationContext {
// Add an initial column to a vectorization context when
// a vectorized row batch is being created.
public void addInitialColumn(String columnName) {
+ initialColumnNames.add(columnName);
int index = projectedColumns.size();
projectedColumns.add(index);
projectionColumnNames.add(columnName);
@@ -234,6 +245,10 @@ public class VectorizationContext {
projectionColumnMap.put(columnName, vectorBatchColIndex);
}
+ public List<String> getInitialColumnNames() {
+ return initialColumnNames;
+ }
+
public List<Integer> getProjectedColumns() {
return projectedColumns;
}
@@ -2303,36 +2318,51 @@ public class VectorizationContext {
}
public static ColumnVector.Type getColumnVectorTypeFromTypeInfo(TypeInfo typeInfo) throws HiveException {
- PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
- PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
-
- switch (primitiveCategory) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case DATE:
- case TIMESTAMP:
- case INTERVAL_YEAR_MONTH:
- case INTERVAL_DAY_TIME:
- return ColumnVector.Type.LONG;
-
- case FLOAT:
- case DOUBLE:
- return ColumnVector.Type.DOUBLE;
-
- case STRING:
- case CHAR:
- case VARCHAR:
- case BINARY:
- return ColumnVector.Type.BYTES;
-
- case DECIMAL:
- return ColumnVector.Type.DECIMAL;
-
- default:
- throw new HiveException("Unexpected primitive type category " + primitiveCategory);
+ switch (typeInfo.getCategory()) {
+ case STRUCT:
+ return Type.STRUCT;
+ case UNION:
+ return Type.UNION;
+ case LIST:
+ return Type.LIST;
+ case MAP:
+ return Type.MAP;
+ case PRIMITIVE: {
+ PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+ PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory();
+
+ switch (primitiveCategory) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case DATE:
+ case TIMESTAMP:
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_DAY_TIME:
+ return ColumnVector.Type.LONG;
+
+ case FLOAT:
+ case DOUBLE:
+ return ColumnVector.Type.DOUBLE;
+
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ return ColumnVector.Type.BYTES;
+
+ case DECIMAL:
+ return ColumnVector.Type.DECIMAL;
+
+ default:
+ throw new RuntimeException("Unexpected primitive type category " + primitiveCategory);
+ }
+ }
+ default:
+ throw new RuntimeException("Unexpected type category " +
+ typeInfo.getCategory());
}
}
@@ -2442,13 +2472,16 @@ public class VectorizationContext {
return firstOutputColumnIndex;
}
- public Map<Integer, String> getScratchColumnTypeMap() {
- Map<Integer, String> map = new HashMap<Integer, String>();
+ public String[] getScratchColumnTypeNames() {
+ String[] result = new String[ocm.outputColCount];
for (int i = 0; i < ocm.outputColCount; i++) {
- String type = ocm.outputColumnsTypes[i];
- map.put(i+this.firstOutputColumnIndex, type);
+ String typeName = ocm.outputColumnsTypes[i];
+ if (typeName.equalsIgnoreCase("long")) {
+ typeName = "bigint"; // Convert our synonym to a real Hive type name.
+ }
+ result[i] = typeName;
}
- return map;
+ return result;
}
@Override
@@ -2468,9 +2501,7 @@ public class VectorizationContext {
}
sb.append("sorted projectionColumnMap ").append(sortedColumnMap).append(", ");
- Map<Integer, String> sortedScratchColumnTypeMap = new TreeMap<Integer, String>(comparerInteger);
- sortedScratchColumnTypeMap.putAll(getScratchColumnTypeMap());
- sb.append("sorted scratchColumnTypeMap ").append(sortedScratchColumnTypeMap);
+ sb.append("scratchColumnTypeNames ").append(getScratchColumnTypeNames().toString());
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index 3d7e4f0..b7e13dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -53,9 +53,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspect
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -111,137 +115,55 @@ public class VectorizedBatchUtil {
batch.size = size;
}
- /**
- * Walk through the object inspector and add column vectors
- *
- * @param oi
- * @param cvList
- * ColumnVectors are populated in this list
- */
- private static void allocateColumnVector(StructObjectInspector oi,
- List<ColumnVector> cvList) throws HiveException {
- if (cvList == null) {
- throw new HiveException("Null columnvector list");
- }
- if (oi == null) {
- return;
- }
- final List<? extends StructField> fields = oi.getAllStructFieldRefs();
- for(StructField field : fields) {
- ObjectInspector fieldObjectInspector = field.getFieldObjectInspector();
- switch(fieldObjectInspector.getCategory()) {
- case PRIMITIVE:
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) fieldObjectInspector;
- switch(poi.getPrimitiveCategory()) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case TIMESTAMP:
- case DATE:
- case INTERVAL_YEAR_MONTH:
- case INTERVAL_DAY_TIME:
- cvList.add(new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
- break;
- case FLOAT:
- case DOUBLE:
- cvList.add(new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
- break;
- case BINARY:
- case STRING:
- case CHAR:
- case VARCHAR:
- cvList.add(new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE));
- break;
- case DECIMAL:
- DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo();
- cvList.add(new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
- tInfo.precision(), tInfo.scale()));
- break;
- default:
- throw new HiveException("Vectorizaton is not supported for datatype:"
- + poi.getPrimitiveCategory());
- }
- break;
- case STRUCT:
- throw new HiveException("Struct not supported");
- default:
- throw new HiveException("Flattening is not supported for datatype:"
- + fieldObjectInspector.getCategory());
- }
- }
- }
-
-
- /**
- * Create VectorizedRowBatch from ObjectInspector
- *
- * @param oi
- * @return
- * @throws HiveException
- */
- public static VectorizedRowBatch constructVectorizedRowBatch(
- StructObjectInspector oi) throws HiveException {
- final List<ColumnVector> cvList = new LinkedList<ColumnVector>();
- allocateColumnVector(oi, cvList);
- final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size());
- int i = 0;
- for(ColumnVector cv : cvList) {
- result.cols[i++] = cv;
- }
- return result;
- }
+ public static ColumnVector createColumnVector(String typeName) {
+ typeName = typeName.toLowerCase();
- /**
- * Create VectorizedRowBatch from key and value object inspectors
- * The row object inspector used by ReduceWork needs to be a **standard**
- * struct object inspector, not just any struct object inspector.
- * @param keyInspector
- * @param valueInspector
- * @param vectorScratchColumnTypeMap
- * @return VectorizedRowBatch, OI
- * @throws HiveException
- */
- public static ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> constructVectorizedRowBatch(
- StructObjectInspector keyInspector, StructObjectInspector valueInspector, Map<Integer, String> vectorScratchColumnTypeMap)
- throws HiveException {
-
- ArrayList<String> colNames = new ArrayList<String>();
- ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
- List<? extends StructField> fields = keyInspector.getAllStructFieldRefs();
- for (StructField field: fields) {
- colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
- ois.add(field.getFieldObjectInspector());
+ // Allow undecorated CHAR and VARCHAR to support scratch column type names.
+ if (typeName.equals("char") || typeName.equals("varchar")) {
+ return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
}
- fields = valueInspector.getAllStructFieldRefs();
- for (StructField field: fields) {
- colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
- ois.add(field.getFieldObjectInspector());
- }
- StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois);
- VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx();
- batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector);
- return new ObjectPair<>(batchContext.createVectorizedRowBatch(), rowObjectInspector);
+ TypeInfo typeInfo = (TypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+ return createColumnVector(typeInfo);
}
- /**
- * Iterates through all columns in a given row and populates the batch
- *
- * @param row
- * @param oi
- * @param rowIndex
- * @param batch
- * @param buffer
- * @throws HiveException
- */
- public static void addRowToBatch(Object row, StructObjectInspector oi,
- int rowIndex,
- VectorizedRowBatch batch,
- DataOutputBuffer buffer
- ) throws HiveException {
- addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer);
+ public static ColumnVector createColumnVector(TypeInfo typeInfo) {
+ switch(typeInfo.getCategory()) {
+ case PRIMITIVE:
+ {
+ PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo;
+ switch(primitiveTypeInfo.getPrimitiveCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case TIMESTAMP:
+ case DATE:
+ case INTERVAL_YEAR_MONTH:
+ case INTERVAL_DAY_TIME:
+ return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+ case FLOAT:
+ case DOUBLE:
+ return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+ case BINARY:
+ case STRING:
+ case CHAR:
+ case VARCHAR:
+ return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+ case DECIMAL:
+ DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo;
+ return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+ tInfo.precision(), tInfo.scale());
+ default:
+ throw new RuntimeException("Vectorizaton is not supported for datatype:"
+ + primitiveTypeInfo.getPrimitiveCategory());
+ }
+ }
+ default:
+ throw new RuntimeException("Vectorization is not supported for datatype:"
+ + typeInfo.getCategory());
+ }
}
/**
@@ -584,31 +506,30 @@ public class VectorizedBatchUtil {
return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,oids);
}
- public static PrimitiveTypeInfo[] primitiveTypeInfosFromStructObjectInspector(
+ public static String[] columnNamesFromStructObjectInspector(
StructObjectInspector structObjectInspector) throws HiveException {
List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
- PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[fields.size()];
+ String[] result = new String[fields.size()];
int i = 0;
for(StructField field : fields) {
- TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(
- field.getFieldObjectInspector().getTypeName());
- result[i++] = (PrimitiveTypeInfo) typeInfo;
+ result[i++] = field.getFieldName();
}
return result;
}
- public static PrimitiveTypeInfo[] primitiveTypeInfosFromTypeNames(
- String[] typeNames) throws HiveException {
-
- PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[typeNames.length];
+ public static TypeInfo[] typeInfosFromTypeNames(String[] typeNames) throws HiveException {
+ ArrayList<TypeInfo> typeInfoList =
+ TypeInfoUtils.typeInfosFromTypeNames(Arrays.asList(typeNames));
+ return typeInfoList.toArray(new TypeInfo[0]);
+ }
- for(int i = 0; i < typeNames.length; i++) {
- TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeNames[i]);
- result[i] = (PrimitiveTypeInfo) typeInfo;
- }
- return result;
+ public static TypeInfo[] typeInfosFromStructObjectInspector(
+ StructObjectInspector structObjectInspector) {
+ ArrayList<TypeInfo> typeInfoList =
+ TypeInfoUtils.typeInfosFromStructObjectInspector(structObjectInspector);
+ return typeInfoList.toArray(new TypeInfo[0]);
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
deleted file mode 100644
index 5ce7553..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector;
-
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.ByteStream;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.lazy.LazyDate;
-import org.apache.hadoop.hive.serde2.lazy.LazyLong;
-import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp;
-import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-/**
- * VectorizedColumnarSerDe is used by Vectorized query execution engine
- * for columnar based storage supported by RCFile.
- */
-public class VectorizedColumnarSerDe extends ColumnarSerDe implements VectorizedSerde {
-
- public VectorizedColumnarSerDe() throws SerDeException {
- }
-
- private final BytesRefArrayWritable[] byteRefArray = new BytesRefArrayWritable[VectorizedRowBatch.DEFAULT_SIZE];
- private final ObjectWritable ow = new ObjectWritable();
- private final ByteStream.Output serializeVectorStream = new ByteStream.Output();
-
- /**
- * Serialize a vectorized row batch
- *
- * @param vrg
- * Vectorized row batch to serialize
- * @param objInspector
- * The ObjectInspector for the row object
- * @return The serialized Writable object
- * @throws SerDeException
- * @see SerDe#serialize(Object, ObjectInspector)
- */
- @Override
- public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
- throws SerDeException {
- try {
- // Validate that the OI is of struct type
- if (objInspector.getCategory() != Category.STRUCT) {
- throw new UnsupportedOperationException(getClass().toString()
- + " can only serialize struct types, but we got: "
- + objInspector.getTypeName());
- }
-
- VectorizedRowBatch batch = (VectorizedRowBatch) vrg;
- StructObjectInspector soi = (StructObjectInspector) objInspector;
- List<? extends StructField> fields = soi.getAllStructFieldRefs();
-
- // Reset the byte buffer
- serializeVectorStream.reset();
- int count = 0;
- int rowIndex = 0;
- for (int i = 0; i < batch.size; i++) {
-
- // If selectedInUse is true then we need to serialize only
- // the selected indexes
- if (batch.selectedInUse) {
- rowIndex = batch.selected[i];
- } else {
- rowIndex = i;
- }
-
- BytesRefArrayWritable byteRow = byteRefArray[i];
- int numCols = fields.size();
-
- if (byteRow == null) {
- byteRow = new BytesRefArrayWritable(numCols);
- byteRefArray[i] = byteRow;
- }
-
- byteRow.resetValid(numCols);
-
- for (int p = 0; p < batch.projectionSize; p++) {
- int k = batch.projectedColumns[p];
- ObjectInspector foi = fields.get(k).getFieldObjectInspector();
- ColumnVector currentColVector = batch.cols[k];
-
- switch (foi.getCategory()) {
- case PRIMITIVE: {
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
- if (!currentColVector.noNulls
- && (currentColVector.isRepeating || currentColVector.isNull[rowIndex])) {
- // The column is null hence write null value
- serializeVectorStream.write(new byte[0], 0, 0);
- } else {
- // If here then the vector value is not null.
- if (currentColVector.isRepeating) {
- // If the vector has repeating values then set rowindex to zero
- rowIndex = 0;
- }
-
- switch (poi.getPrimitiveCategory()) {
- case BOOLEAN: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[k];
- // In vectorization true is stored as 1 and false as 0
- boolean b = lcv.vector[rowIndex] == 1 ? true : false;
- if (b) {
- serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length);
- } else {
- serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length);
- }
- }
- break;
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- LongColumnVector lcv = (LongColumnVector) batch.cols[k];
- LazyLong.writeUTF8(serializeVectorStream, lcv.vector[rowIndex]);
- break;
- case FLOAT:
- case DOUBLE:
- DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[k];
- ByteBuffer b = Text.encode(String.valueOf(dcv.vector[rowIndex]));
- serializeVectorStream.write(b.array(), 0, b.limit());
- break;
- case BINARY: {
- BytesColumnVector bcv = (BytesColumnVector) batch.cols[k];
- byte[] bytes = bcv.vector[rowIndex];
- serializeVectorStream.write(bytes, 0, bytes.length);
- }
- break;
- case STRING:
- case CHAR:
- case VARCHAR: {
- // Is it correct to escape CHAR and VARCHAR?
- BytesColumnVector bcv = (BytesColumnVector) batch.cols[k];
- LazyUtils.writeEscaped(serializeVectorStream, bcv.vector[rowIndex],
- bcv.start[rowIndex],
- bcv.length[rowIndex],
- serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams
- .getNeedsEscape());
- }
- break;
- case TIMESTAMP:
- LongColumnVector tcv = (LongColumnVector) batch.cols[k];
- long timeInNanoSec = tcv.vector[rowIndex];
- Timestamp t = new Timestamp(0);
- TimestampUtils.assignTimeInNanoSec(timeInNanoSec, t);
- TimestampWritable tw = new TimestampWritable();
- tw.set(t);
- LazyTimestamp.writeUTF8(serializeVectorStream, tw);
- break;
- case DATE:
- LongColumnVector dacv = (LongColumnVector) batch.cols[k];
- DateWritable daw = new DateWritable((int) dacv.vector[rowIndex]);
- LazyDate.writeUTF8(serializeVectorStream, daw);
- break;
- default:
- throw new UnsupportedOperationException(
- "Vectorizaton is not supported for datatype:"
- + poi.getPrimitiveCategory());
- }
- }
- break;
- }
- case LIST:
- case MAP:
- case STRUCT:
- case UNION:
- throw new UnsupportedOperationException("Vectorizaton is not supported for datatype:"
- + foi.getCategory());
- default:
- throw new SerDeException("Unknown ObjectInspector category!");
-
- }
-
- byteRow.get(k).set(serializeVectorStream.getData(), count, serializeVectorStream
- .getLength() - count);
- count = serializeVectorStream.getLength();
- }
-
- }
- ow.set(byteRefArray);
- } catch (Exception e) {
- throw new SerDeException(e);
- }
- return ow;
- }
-
- @Override
- public SerDeStats getSerDeStats() {
- return null;
- }
-
- @Override
- public Class<? extends Writable> getSerializedClass() {
- return BytesRefArrayWritable.class;
- }
-
- @Override
- public Object deserialize(Writable blob) throws SerDeException {
-
- // Ideally this should throw UnsupportedOperationException as the serde is
- // vectorized serde. But since RC file reader does not support vectorized reading this
- // is left as it is. This function will be called from VectorizedRowBatchCtx::addRowToBatch
- // to deserialize the row one by one and populate the batch. Once RC file reader supports vectorized
- // reading this serde and be standalone serde with no dependency on ColumnarSerDe.
- return super.deserialize(blob);
- }
-
- @Override
- public ObjectInspector getObjectInspector() throws SerDeException {
- return cachedObjectInspector;
- }
-
- @Override
- public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Deserializes the rowBlob into Vectorized row batch
- * @param rowBlob
- * rowBlob row batch to deserialize
- * @param rowsInBlob
- * Total number of rows in rowBlob to deserialize
- * @param reuseBatch
- * VectorizedRowBatch to which the rows should be serialized *
- * @throws SerDeException
- */
- @Override
- public void deserializeVector(Object rowBlob, int rowsInBlob,
- VectorizedRowBatch reuseBatch) throws SerDeException {
-
- BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) rowBlob;
- DataOutputBuffer buffer = new DataOutputBuffer();
- for (int i = 0; i < rowsInBlob; i++) {
- Object row = deserialize(refArray[i]);
- try {
- VectorizedBatchUtil.addRowToBatch(row,
- (StructObjectInspector) cachedObjectInspector, i,
- reuseBatch, buffer);
- } catch (HiveException e) {
- throw new SerDeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
index 7e41384..2882024 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java
@@ -45,6 +45,10 @@ public class VectorizedRowBatch implements Writable {
public int[] projectedColumns;
public int projectionSize;
+ private int dataColumnCount;
+ private int partitionColumnCount;
+
+
/*
* If no filtering has been applied yet, selectedInUse is false,
* meaning that all rows qualify. If it is true, then the selected[] array
@@ -94,6 +98,22 @@ public class VectorizedRowBatch implements Writable {
for (int i = 0; i < numCols; i++) {
projectedColumns[i] = i;
}
+
+ dataColumnCount = -1;
+ partitionColumnCount = -1;
+ }
+
+ public void setPartitionInfo(int dataColumnCount, int partitionColumnCount) {
+ this.dataColumnCount = dataColumnCount;
+ this.partitionColumnCount = partitionColumnCount;
+ }
+
+ public int getDataColumnCount() {
+ return dataColumnCount;
+ }
+
+ public int getPartitionColumnCount() {
+ return partitionColumnCount;
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
index 81ab129..efb06b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
@@ -20,16 +20,10 @@ package org.apache.hadoop.hive.ql.exec.vector;
import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,392 +32,270 @@ import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.IOPrepareCache;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hive.common.util.DateUtils;
/**
- * Context for Vectorized row batch. this calss does eager deserialization of row data using serde
+ * Context for Vectorized row batch. this class does eager deserialization of row data using serde
* in the RecordReader layer.
* It has supports partitions in this layer so that the vectorized batch is populated correctly
* with the partition column.
*/
public class VectorizedRowBatchCtx {
+ private static final long serialVersionUID = 1L;
+
private static final Log LOG = LogFactory.getLog(VectorizedRowBatchCtx.class.getName());
- // OI for raw row data (EG without partition cols)
- private StructObjectInspector rawRowOI;
+ // The following information is for creating VectorizedRowBatch and for helping with
+ // knowing how the table is partitioned.
+ //
+ // It will be stored in MapWork and ReduceWork.
+ private String[] rowColumnNames;
+ private TypeInfo[] rowColumnTypeInfos;
+ private int dataColumnCount;
+ private int partitionColumnCount;
- // OI for the row (Raw row OI + partition OI)
- private StructObjectInspector rowOI;
+ private String[] scratchColumnTypeNames;
- // Deserializer for the row data
- private Deserializer deserializer;
+ /**
+ * Constructor for VectorizedRowBatchCtx
+ */
+ public VectorizedRowBatchCtx() {
+ }
- // Hash map of partition values. Key=TblColName value=PartitionValue
- private Map<String, Object> partitionValues;
-
- //partition types
- private Map<String, PrimitiveCategory> partitionTypes;
+ public VectorizedRowBatchCtx(String[] rowColumnNames, TypeInfo[] rowColumnTypeInfos,
+ int partitionColumnCount, String[] scratchColumnTypeNames) {
+ this.rowColumnNames = rowColumnNames;
+ this.rowColumnTypeInfos = rowColumnTypeInfos;
+ this.partitionColumnCount = partitionColumnCount;
+ this.scratchColumnTypeNames = scratchColumnTypeNames;
- // partition column positions, for use by classes that need to know whether a given column is a
- // partition column
- private Set<Integer> partitionCols;
-
- // Column projection list - List of column indexes to include. This
- // list does not contain partition columns
- private List<Integer> colsToInclude;
+ dataColumnCount = rowColumnTypeInfos.length - partitionColumnCount;
+ }
- private Map<Integer, String> scratchColumnTypeMap = null;
+ public String[] getRowColumnNames() {
+ return rowColumnNames;
+ }
- /**
- * Constructor for VectorizedRowBatchCtx
- *
- * @param rawRowOI
- * OI for raw row data (EG without partition cols)
- * @param rowOI
- * OI for the row (Raw row OI + partition OI)
- * @param deserializer
- * Deserializer for the row data
- * @param partitionValues
- * Hash map of partition values. Key=TblColName value=PartitionValue
- */
- public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspector rowOI,
- Deserializer deserializer, Map<String, Object> partitionValues,
- Map<String, PrimitiveCategory> partitionTypes) {
- this.rowOI = rowOI;
- this.rawRowOI = rawRowOI;
- this.deserializer = deserializer;
- this.partitionValues = partitionValues;
- this.partitionTypes = partitionTypes;
+ public TypeInfo[] getRowColumnTypeInfos() {
+ return rowColumnTypeInfos;
}
- /**
- * Constructor for VectorizedRowBatchCtx
- */
- public VectorizedRowBatchCtx() {
+ public int getDataColumnCount() {
+ return dataColumnCount;
+ }
+ public int getPartitionColumnCount() {
+ return partitionColumnCount;
+ }
+
+ public String[] getScratchColumnTypeNames() {
+ return scratchColumnTypeNames;
}
/**
- * Initializes the VectorizedRowBatch context based on an scratch column type map and
+ * Initializes the VectorizedRowBatch context based on an scratch column type names and
* object inspector.
- * @param scratchColumnTypeMap
- * @param rowOI
+ * @param structObjectInspector
+ * @param scratchColumnTypeNames
* Object inspector that shapes the column types
+ * @throws HiveException
*/
- public void init(Map<Integer, String> scratchColumnTypeMap,
- StructObjectInspector rowOI) {
- this.scratchColumnTypeMap = scratchColumnTypeMap;
- this.rowOI= rowOI;
- this.rawRowOI = rowOI;
+ public void init(StructObjectInspector structObjectInspector, String[] scratchColumnTypeNames)
+ throws HiveException {
+
+ // Row column information.
+ rowColumnNames = VectorizedBatchUtil.columnNamesFromStructObjectInspector(structObjectInspector);
+ rowColumnTypeInfos = VectorizedBatchUtil.typeInfosFromStructObjectInspector(structObjectInspector);
+ partitionColumnCount = 0;
+ dataColumnCount = rowColumnTypeInfos.length;
+
+ // Scratch column information.
+ this.scratchColumnTypeNames = scratchColumnTypeNames;
}
- /**
- * Initializes VectorizedRowBatch context based on the
- * split and Hive configuration (Job conf with hive Plan).
- *
- * @param hiveConf
- * Hive configuration using Hive plan is extracted
- * @param split
- * File split of the file being read
- * @throws ClassNotFoundException
- * @throws IOException
- * @throws SerDeException
- * @throws InstantiationException
- * @throws IllegalAccessException
- * @throws HiveException
- */
- public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException,
- IOException,
- SerDeException,
- InstantiationException,
- IllegalAccessException,
- HiveException {
+ public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, Configuration hiveConf,
+ FileSplit split, Object[] partitionValues) throws IOException {
Map<String, PartitionDesc> pathToPartitionInfo = Utilities
.getMapWork(hiveConf).getPathToPartitionInfo();
- PartitionDesc part = HiveFileFormatUtils
+ PartitionDesc partDesc = HiveFileFormatUtils
.getPartitionDescFromPathRecursively(pathToPartitionInfo,
split.getPath(), IOPrepareCache.get().getPartitionDescMap());
- String partitionPath = split.getPath().getParent().toString();
- scratchColumnTypeMap = Utilities.getMapWorkVectorScratchColumnTypeMap(hiveConf);
- // LOG.info("VectorizedRowBatchCtx init scratchColumnTypeMap " + scratchColumnTypeMap.toString());
-
- Properties partProps =
- (part.getPartSpec() == null || part.getPartSpec().isEmpty()) ?
- part.getTableDesc().getProperties() : part.getProperties();
-
- Class serdeclass = hiveConf.getClassByName(part.getSerdeClassName());
- Deserializer partDeserializer = (Deserializer) serdeclass.newInstance();
- SerDeUtils.initializeSerDe(partDeserializer, hiveConf, part.getTableDesc().getProperties(),
- partProps);
- StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer
- .getObjectInspector();
-
- deserializer = partDeserializer;
-
- // Check to see if this split is part of a partition of a table
- String pcols = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-
- String[] partKeys = null;
- if (pcols != null && pcols.length() > 0) {
-
- // Partitions exist for this table. Get the partition object inspector and
- // raw row object inspector (row with out partition col)
- LinkedHashMap<String, String> partSpec = part.getPartSpec();
- partKeys = pcols.trim().split("/");
- String pcolTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- String[] partKeyTypes = pcolTypes.trim().split(":");
-
- if (partKeys.length > partKeyTypes.length) {
- throw new HiveException("Internal error : partKeys length, " +partKeys.length +
- " greater than partKeyTypes length, " + partKeyTypes.length);
- }
-
- List<String> partNames = new ArrayList<String>(partKeys.length);
- List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
- partitionValues = new LinkedHashMap<String, Object>();
- partitionTypes = new LinkedHashMap<String, PrimitiveCategory>();
- for (int i = 0; i < partKeys.length; i++) {
- String key = partKeys[i];
- partNames.add(key);
- ObjectInspector objectInspector = null;
- Object objectVal;
- if (partSpec == null) {
- // for partitionless table, initialize partValue to empty string.
- // We can have partitionless table even if we have partition keys
- // when there is only only partition selected and the partition key is not
- // part of the projection/include list.
- objectVal = null;
- objectInspector = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
- partitionTypes.put(key, PrimitiveCategory.STRING);
- } else {
- // Create a Standard java object Inspector
- PrimitiveTypeInfo partColTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]);
- objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
- partColTypeInfo);
- objectVal =
- ObjectInspectorConverters.
- getConverter(PrimitiveObjectInspectorFactory.
- javaStringObjectInspector, objectInspector).
- convert(partSpec.get(key));
- if (partColTypeInfo instanceof CharTypeInfo) {
- objectVal = ((HiveChar) objectVal).getStrippedValue();
- }
- partitionTypes.put(key, partColTypeInfo.getPrimitiveCategory());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Partition column: name: " + key + ", value: " + objectVal + ", type: " + partitionTypes.get(key));
- }
- partitionValues.put(key, objectVal);
- partObjectInspectors.add(objectInspector);
- }
-
- // Create partition OI
- StructObjectInspector partObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(partNames, partObjectInspectors);
-
- // Get row OI from partition OI and raw row OI
- StructObjectInspector rowObjectInspector = ObjectInspectorFactory
- .getUnionStructObjectInspector(Arrays
- .asList(new StructObjectInspector[] {partRawRowObjectInspector, partObjectInspector}));
- rowOI = rowObjectInspector;
- rawRowOI = partRawRowObjectInspector;
-
- // We have to do this after we've set rowOI, as getColIndexBasedOnColName uses it
- partitionCols = new HashSet<Integer>();
- if (pcols != null && pcols.length() > 0) {
- for (int i = 0; i < partKeys.length; i++) {
- partitionCols.add(getColIndexBasedOnColName(partKeys[i]));
- }
- }
+ getPartitionValues(vrbCtx, partDesc, partitionValues);
- } else {
+ }
- // No partitions for this table, hence row OI equals raw row OI
- rowOI = partRawRowObjectInspector;
- rawRowOI = partRawRowObjectInspector;
+ public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, PartitionDesc partDesc,
+ Object[] partitionValues) {
+
+ LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
+
+ for (int i = 0; i < vrbCtx.partitionColumnCount; i++) {
+ Object objectValue;
+ if (partSpec == null) {
+ // For partition-less table, initialize partValue to empty string.
+ // We can have partition-less table even if we have partition keys
+ // when there is only only partition selected and the partition key is not
+ // part of the projection/include list.
+ objectValue = null;
+ } else {
+ String key = vrbCtx.rowColumnNames[vrbCtx.dataColumnCount + i];
+
+ // Create a Standard java object Inspector
+ ObjectInspector objectInspector =
+ TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+ vrbCtx.rowColumnTypeInfos[vrbCtx.dataColumnCount + i]);
+ objectValue =
+ ObjectInspectorConverters.
+ getConverter(PrimitiveObjectInspectorFactory.
+ javaStringObjectInspector, objectInspector).
+ convert(partSpec.get(key));
+ }
+ partitionValues[i] = objectValue;
}
-
- colsToInclude = ColumnProjectionUtils.getReadColumnIDs(hiveConf);
}
-
+
/**
* Creates a Vectorized row batch and the column vectors.
*
* @return VectorizedRowBatch
* @throws HiveException
*/
- public VectorizedRowBatch createVectorizedRowBatch() throws HiveException
+ public VectorizedRowBatch createVectorizedRowBatch()
{
- List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
- VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size());
- for (int j = 0; j < fieldRefs.size(); j++) {
- // If the column is included in the include list or if the column is a
- // partition column then create the column vector. Also note that partition columns are not
- // in the included list.
- if ((colsToInclude == null) || colsToInclude.contains(j)
- || ((partitionValues != null) &&
- partitionValues.containsKey(fieldRefs.get(j).getFieldName()))) {
- ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector();
- switch (foi.getCategory()) {
- case PRIMITIVE: {
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
- // Vectorization currently only supports the following data types:
- // BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, BINARY, STRING, CHAR, VARCHAR, TIMESTAMP,
- // DATE and DECIMAL
- switch (poi.getPrimitiveCategory()) {
- case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- case TIMESTAMP:
- case DATE:
- case INTERVAL_YEAR_MONTH:
- case INTERVAL_DAY_TIME:
- result.cols[j] = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
- break;
- case FLOAT:
- case DOUBLE:
- result.cols[j] = new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
- break;
- case BINARY:
- case STRING:
- case CHAR:
- case VARCHAR:
- result.cols[j] = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
- break;
- case DECIMAL:
- DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo();
- result.cols[j] = new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
- tInfo.precision(), tInfo.scale());
- break;
- default:
- throw new RuntimeException("Vectorizaton is not supported for datatype:"
- + poi.getPrimitiveCategory());
- }
- break;
- }
- case LIST:
- case MAP:
- case STRUCT:
- case UNION:
- throw new HiveException("Vectorizaton is not supported for datatype:"
- + foi.getCategory());
- default:
- throw new HiveException("Unknown ObjectInspector category!");
- }
- }
+ int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length;
+ VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
+
+ LOG.info("createVectorizedRowBatch columnsToIncludeTruncated NONE");
+ for (int i = 0; i < rowColumnTypeInfos.length; i++) {
+ TypeInfo typeInfo = rowColumnTypeInfos[i];
+ result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+ }
+
+ for (int i = 0; i < scratchColumnTypeNames.length; i++) {
+ String typeName = scratchColumnTypeNames[i];
+ result.cols[rowColumnTypeInfos.length + i] =
+ VectorizedBatchUtil.createColumnVector(typeName);
}
- result.numCols = fieldRefs.size();
- this.addScratchColumnsToBatch(result);
+
+ result.setPartitionInfo(dataColumnCount, partitionColumnCount);
+
result.reset();
return result;
}
- /**
- * Adds the row to the batch after deserializing the row
- *
- * @param rowIndex
- * Row index in the batch to which the row is added
- * @param rowBlob
- * Row blob (serialized version of row)
- * @param batch
- * Vectorized batch to which the row is added
- * @param buffer a buffer to copy strings into
- * @throws HiveException
- * @throws SerDeException
- */
- public void addRowToBatch(int rowIndex, Writable rowBlob,
- VectorizedRowBatch batch,
- DataOutputBuffer buffer
- ) throws HiveException, SerDeException
+ public VectorizedRowBatch createVectorizedRowBatch(boolean[] columnsToIncludeTruncated)
{
- Object row = this.deserializer.deserialize(rowBlob);
- VectorizedBatchUtil.addRowToBatch(row, this.rawRowOI, rowIndex, batch, buffer);
- }
+ if (columnsToIncludeTruncated == null) {
+ return createVectorizedRowBatch();
+ }
- /**
- * Deserialized set of rows and populates the batch
- *
- * @param rowBlob
- * to deserialize
- * @param batch
- * Vectorized row batch which contains deserialized data
- * @throws SerDeException
- */
- public void convertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob,
- VectorizedRowBatch batch)
- throws SerDeException {
-
- if (deserializer instanceof VectorizedSerde) {
- ((VectorizedSerde) deserializer).deserializeVector(rowBlob, rowsInBlob, batch);
- } else {
- throw new SerDeException(
- "Not able to deserialize row batch. Serde does not implement VectorizedSerde");
+ LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + Arrays.toString(columnsToIncludeTruncated));
+ int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length;
+ VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
+
+ for (int i = 0; i < columnsToIncludeTruncated.length; i++) {
+ if (columnsToIncludeTruncated[i]) {
+ TypeInfo typeInfo = rowColumnTypeInfos[i];
+ result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+ }
}
+
+ for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; i++) {
+ TypeInfo typeInfo = rowColumnTypeInfos[i];
+ result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+ }
+
+ for (int i = 0; i < scratchColumnTypeNames.length; i++) {
+ String typeName = scratchColumnTypeNames[i];
+ result.cols[rowColumnTypeInfos.length + i] =
+ VectorizedBatchUtil.createColumnVector(typeName);
+ }
+
+ result.setPartitionInfo(dataColumnCount, partitionColumnCount);
+
+ result.reset();
+ return result;
}
- private int getColIndexBasedOnColName(String colName) throws HiveException
- {
- List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
- for (int i = 0; i < fieldRefs.size(); i++) {
- if (fieldRefs.get(i).getFieldName().equals(colName)) {
- return i;
+ public boolean[] getColumnsToIncludeTruncated(Configuration conf) {
+ boolean[] columnsToIncludeTruncated = null;
+
+ List<Integer> columnsToIncludeTruncatedList = ColumnProjectionUtils.getReadColumnIDs(conf);
+ if (columnsToIncludeTruncatedList != null && columnsToIncludeTruncatedList.size() > 0 ) {
+
+ // Partitioned columns will not be in the include list.
+
+ boolean[] columnsToInclude = new boolean[dataColumnCount];
+ Arrays.fill(columnsToInclude, false);
+ for (int columnNum : columnsToIncludeTruncatedList) {
+ if (columnNum < dataColumnCount) {
+ columnsToInclude[columnNum] = true;
+ }
+ }
+
+ // Work backwards to find the highest wanted column.
+
+ int highestWantedColumnNum = -1;
+ for (int i = dataColumnCount - 1; i >= 0; i--) {
+ if (columnsToInclude[i]) {
+ highestWantedColumnNum = i;
+ break;
+ }
+ }
+ if (highestWantedColumnNum == -1) {
+ throw new RuntimeException("No columns to include?");
+ }
+ int newColumnCount = highestWantedColumnNum + 1;
+ if (newColumnCount == dataColumnCount) {
+ // Didn't trim any columns off the end. Use the original.
+ columnsToIncludeTruncated = columnsToInclude;
+ } else {
+ columnsToIncludeTruncated = Arrays.copyOf(columnsToInclude, newColumnCount);
}
}
- throw new HiveException("Not able to find column name in row object inspector");
+ return columnsToIncludeTruncated;
}
-
+
/**
* Add the partition values to the batch
*
* @param batch
+ * @param partitionValues
* @throws HiveException
*/
- public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException
+ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partitionValues)
{
- int colIndex;
- Object value;
- PrimitiveCategory pCategory;
if (partitionValues != null) {
- for (String key : partitionValues.keySet()) {
- colIndex = getColIndexBasedOnColName(key);
- value = partitionValues.get(key);
- pCategory = partitionTypes.get(key);
-
- switch (pCategory) {
+ for (int i = 0; i < partitionColumnCount; i++) {
+ Object value = partitionValues[i];
+
+ int colIndex = dataColumnCount + i;
+ String partitionColumnName = rowColumnNames[colIndex];
+ PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) rowColumnTypeInfos[colIndex];
+ switch (primitiveTypeInfo.getPrimitiveCategory()) {
case BOOLEAN: {
LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex];
if (value == null) {
@@ -575,7 +447,7 @@ public class VectorizedRowBatchCtx {
HiveDecimal hd = (HiveDecimal) value;
dv.set(0, hd);
dv.isRepeating = true;
- dv.isNull[0] = false;
+ dv.isNull[0] = false;
}
}
break;
@@ -604,15 +476,15 @@ public class VectorizedRowBatchCtx {
bcv.isNull[0] = true;
bcv.isRepeating = true;
} else {
- bcv.fill(sVal.getBytes());
+ bcv.fill(sVal.getBytes());
bcv.isNull[0] = false;
}
}
break;
-
+
default:
- throw new HiveException("Unable to recognize the partition type " + pCategory +
- " for column " + key);
+ throw new RuntimeException("Unable to recognize the partition type " + primitiveTypeInfo.getPrimitiveCategory() +
+ " for column " + partitionColumnName);
}
}
}
@@ -620,64 +492,12 @@ public class VectorizedRowBatchCtx {
/**
* Determine whether a given column is a partition column
- * @param colnum column number in
+ * @param colNum column number in
* {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}s created by this context.
* @return true if it is a partition column, false otherwise
*/
- public final boolean isPartitionCol(int colnum) {
- return (partitionCols == null) ? false : partitionCols.contains(colnum);
- }
-
- private void addScratchColumnsToBatch(VectorizedRowBatch vrb) throws HiveException {
- if (scratchColumnTypeMap != null && !scratchColumnTypeMap.isEmpty()) {
- int origNumCols = vrb.numCols;
- int newNumCols = vrb.cols.length+scratchColumnTypeMap.keySet().size();
- vrb.cols = Arrays.copyOf(vrb.cols, newNumCols);
- for (int i = origNumCols; i < newNumCols; i++) {
- String typeName = scratchColumnTypeMap.get(i);
- if (typeName == null) {
- throw new HiveException("No type entry found for column " + i + " in map " + scratchColumnTypeMap.toString());
- }
- vrb.cols[i] = allocateColumnVector(typeName,
- VectorizedRowBatch.DEFAULT_SIZE);
- }
- vrb.numCols = vrb.cols.length;
- }
- }
-
- /**
- * Get the scale and precision for the given decimal type string. The decimal type is assumed to be
- * of the format decimal(precision,scale) e.g. decimal(20,10).
- * @param decimalType The given decimal type string.
- * @return An integer array of size 2 with first element set to precision and second set to scale.
- */
- private static int[] getScalePrecisionFromDecimalType(String decimalType) {
- Pattern p = Pattern.compile("\\d+");
- Matcher m = p.matcher(decimalType);
- m.find();
- int precision = Integer.parseInt(m.group());
- m.find();
- int scale = Integer.parseInt(m.group());
- int [] precScale = { precision, scale };
- return precScale;
+ public final boolean isPartitionCol(int colNum) {
+ return colNum >= dataColumnCount && colNum < rowColumnTypeInfos.length;
}
- public static ColumnVector allocateColumnVector(String type, int defaultSize) {
- if (type.equalsIgnoreCase("double")) {
- return new DoubleColumnVector(defaultSize);
- } else if (VectorizationContext.isStringFamily(type)) {
- return new BytesColumnVector(defaultSize);
- } else if (VectorizationContext.decimalTypePattern.matcher(type).matches()){
- int [] precisionScale = getScalePrecisionFromDecimalType(type);
- return new DecimalColumnVector(defaultSize, precisionScale[0], precisionScale[1]);
- } else if (type.equalsIgnoreCase("long") ||
- type.equalsIgnoreCase("date") ||
- type.equalsIgnoreCase("timestamp") ||
- type.equalsIgnoreCase(serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME) ||
- type.equalsIgnoreCase(serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME)) {
- return new LongColumnVector(defaultSize);
- } else {
- throw new RuntimeException("Cannot allocate vector column for " + type);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 35e3403..f28d3ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,12 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
import org.apache.hadoop.hive.ql.exec.HashTableLoader;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorColumnMapping;
import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping;
@@ -58,7 +52,6 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.optimized.VectorMapJoinOpti
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -70,6 +63,8 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
/**
* This class is common operator class for native vectorized map join.
@@ -572,10 +567,11 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
* Create our vectorized copy row and deserialize row helper objects.
*/
if (smallTableMapping.getCount() > 0) {
- smallTableVectorDeserializeRow = new VectorDeserializeRow(
- new LazyBinaryDeserializeRead(
- VectorizedBatchUtil.primitiveTypeInfosFromTypeNames(
- smallTableMapping.getTypeNames())));
+ smallTableVectorDeserializeRow =
+ new VectorDeserializeRow(
+ new LazyBinaryDeserializeRead(
+ VectorizedBatchUtil.typeInfosFromTypeNames(
+ smallTableMapping.getTypeNames())));
smallTableVectorDeserializeRow.init(smallTableMapping.getOutputColumns());
}
@@ -649,23 +645,13 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
* Setup our 2nd batch with the same "column schema" as the big table batch that can be used to
* build join output results in.
*/
- protected VectorizedRowBatch setupOverflowBatch() {
+ protected VectorizedRowBatch setupOverflowBatch() throws HiveException {
+
+ int initialColumnCount = vContext.firstOutputColumnIndex();
VectorizedRowBatch overflowBatch;
- Map<Integer, String> scratchColumnTypeMap = vOutContext.getScratchColumnTypeMap();
- int maxColumn = 0;
- for (int i = 0; i < outputProjection.length; i++) {
- int outputColumn = outputProjection[i];
- if (maxColumn < outputColumn) {
- maxColumn = outputColumn;
- }
- }
- for (int outputColumn : scratchColumnTypeMap.keySet()) {
- if (maxColumn < outputColumn) {
- maxColumn = outputColumn;
- }
- }
- overflowBatch = new VectorizedRowBatch(maxColumn + 1);
+ int totalNumColumns = initialColumnCount + vOutContext.getScratchColumnTypeNames().length;
+ overflowBatch = new VectorizedRowBatch(totalNumColumns);
// First, just allocate just the projection columns we will be using.
for (int i = 0; i < outputProjection.length; i++) {
@@ -675,9 +661,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
}
// Now, add any scratch columns needed for children operators.
- for (int outputColumn : scratchColumnTypeMap.keySet()) {
- String typeName = scratchColumnTypeMap.get(outputColumn);
- allocateOverflowBatchColumnVector(overflowBatch, outputColumn, typeName);
+ int outputColumn = initialColumnCount;
+ for (String typeName : vOutContext.getScratchColumnTypeNames()) {
+ allocateOverflowBatchColumnVector(overflowBatch, outputColumn++, typeName);
}
overflowBatch.projectedColumns = outputProjection;
@@ -695,22 +681,13 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
String typeName) {
if (overflowBatch.cols[outputColumn] == null) {
- String vectorTypeName;
- if (VectorizationContext.isIntFamily(typeName) ||
- VectorizationContext.isDatetimeFamily(typeName)) {
- vectorTypeName = "long";
- } else if (VectorizationContext.isFloatFamily(typeName)) {
- vectorTypeName = "double";
- } else if (VectorizationContext.isStringFamily(typeName)) {
- vectorTypeName = "string";
- } else if (VectorizationContext.decimalTypePattern.matcher(typeName).matches()){
- vectorTypeName = typeName; // Keep precision and scale.
- } else {
- throw new RuntimeException("Cannot determine vector type for " + typeName);
- }
- overflowBatch.cols[outputColumn] = VectorizedRowBatchCtx.allocateColumnVector(vectorTypeName, VectorizedRowBatch.DEFAULT_SIZE);
+ typeName = VectorizationContext.mapTypeNameSynonyms(typeName);
- if (LOG.isDebugEnabled()) {
+ TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+
+ overflowBatch.cols[outputColumn] = VectorizedBatchUtil.createColumnVector(typeInfo);
+
+ if (isLogDebugEnabled) {
LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator initializeOp overflowBatch outputColumn " + outputColumn + " class " + overflowBatch.cols[outputColumn].getClass().getSimpleName());
}
}