You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC
svn commit: r1623263 [20/28] - in /hive/branches/spark: ./
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/
ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/
beeline/src/test/org/apache/hive/beeline/ bin/...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java Mon Sep 8 04:38:17 2014
@@ -28,6 +28,7 @@ import java.util.List;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.common.type.Decimal128;
+import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.ql.exec.vector.*;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableDateObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableDoubleObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableFloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveCharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveVarcharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableIntObjectInspector;
@@ -82,7 +84,7 @@ public final class VectorExpressionWrite
this.objectInspector = objectInspector;
return this;
}
-
+
/**
* The base implementation must be overridden by the Long specialization
*/
@@ -90,7 +92,7 @@ public final class VectorExpressionWrite
public Object writeValue(long value) throws HiveException {
throw new HiveException("Internal error: should not reach here");
}
-
+
/**
* The base implementation must be overridden by the Long specialization
*/
@@ -112,7 +114,7 @@ public final class VectorExpressionWrite
public Object setValue(Object field, double value) throws HiveException {
throw new HiveException("Internal error: should not reach here");
}
-
+
/**
* The base implementation must be overridden by the Bytes specialization
*/
@@ -120,7 +122,7 @@ public final class VectorExpressionWrite
public Object writeValue(byte[] value, int start, int length) throws HiveException {
throw new HiveException("Internal error: should not reach here");
}
-
+
/**
* The base implementation must be overridden by the Bytes specialization
*/
@@ -171,7 +173,7 @@ public final class VectorExpressionWrite
"Incorrect null/repeating: row:%d noNulls:%b isRepeating:%b isNull[row]:%b isNull[0]:%b",
row, lcv.noNulls, lcv.isRepeating, lcv.isNull[row], lcv.isNull[0]));
}
-
+
@Override
public Object setValue(Object field, ColumnVector column, int row) throws HiveException {
LongColumnVector lcv = (LongColumnVector) column;
@@ -192,7 +194,7 @@ public final class VectorExpressionWrite
String.format(
"Incorrect null/repeating: row:%d noNulls:%b isRepeating:%b isNull[row]:%b isNull[0]:%b",
row, lcv.noNulls, lcv.isRepeating, lcv.isNull[row], lcv.isNull[0]));
- }
+ }
}
/**
@@ -221,7 +223,7 @@ public final class VectorExpressionWrite
"Incorrect null/repeating: row:%d noNulls:%b isRepeating:%b isNull[row]:%b isNull[0]:%b",
row, dcv.noNulls, dcv.isRepeating, dcv.isNull[row], dcv.isNull[0]));
}
-
+
@Override
public Object setValue(Object field, ColumnVector column, int row) throws HiveException {
DoubleColumnVector dcv = (DoubleColumnVector) column;
@@ -242,7 +244,7 @@ public final class VectorExpressionWrite
String.format(
"Incorrect null/repeating: row:%d noNulls:%b isRepeating:%b isNull[row]:%b isNull[0]:%b",
row, dcv.noNulls, dcv.isRepeating, dcv.isNull[row], dcv.isNull[0]));
- }
+ }
}
/**
@@ -292,7 +294,7 @@ public final class VectorExpressionWrite
String.format(
"Incorrect null/repeating: row:%d noNulls:%b isRepeating:%b isNull[row]:%b isNull[0]:%b",
row, bcv.noNulls, bcv.isRepeating, bcv.isNull[row], bcv.isNull[0]));
- }
+ }
}
@@ -396,13 +398,16 @@ public final class VectorExpressionWrite
(SettableLongObjectInspector) fieldObjInspector);
case VOID:
return genVectorExpressionWritableVoid(
- (VoidObjectInspector) fieldObjInspector);
+ (VoidObjectInspector) fieldObjInspector);
case BINARY:
return genVectorExpressionWritableBinary(
(SettableBinaryObjectInspector) fieldObjInspector);
case STRING:
return genVectorExpressionWritableString(
(SettableStringObjectInspector) fieldObjInspector);
+ case CHAR:
+ return genVectorExpressionWritableChar(
+ (SettableHiveCharObjectInspector) fieldObjInspector);
case VARCHAR:
return genVectorExpressionWritableVarchar(
(SettableHiveVarcharObjectInspector) fieldObjInspector);
@@ -419,7 +424,7 @@ public final class VectorExpressionWrite
throw new IllegalArgumentException("Unknown primitive type: " +
((PrimitiveObjectInspector) fieldObjInspector).getPrimitiveCategory());
}
-
+
case STRUCT:
case UNION:
case MAP:
@@ -428,7 +433,7 @@ public final class VectorExpressionWrite
fieldObjInspector.getCategory());
default:
throw new IllegalArgumentException("Unknown type " +
- fieldObjInspector.getCategory());
+ fieldObjInspector.getCategory());
}
}
@@ -526,7 +531,7 @@ public final class VectorExpressionWrite
private Object obj;
private Timestamp ts;
- public VectorExpressionWriter init(SettableTimestampObjectInspector objInspector)
+ public VectorExpressionWriter init(SettableTimestampObjectInspector objInspector)
throws HiveException {
super.init(objInspector);
ts = new Timestamp(0);
@@ -550,7 +555,7 @@ public final class VectorExpressionWrite
((SettableTimestampObjectInspector) this.objectInspector).set(field, ts);
return field;
}
-
+
@Override
public Object initValue(Object ignored) {
return ((SettableTimestampObjectInspector) this.objectInspector).create(new Timestamp(0));
@@ -558,13 +563,13 @@ public final class VectorExpressionWrite
}.init(fieldObjInspector);
}
- private static VectorExpressionWriter genVectorExpressionWritableVarchar(
- SettableHiveVarcharObjectInspector fieldObjInspector) throws HiveException {
+ private static VectorExpressionWriter genVectorExpressionWritableChar(
+ SettableHiveCharObjectInspector fieldObjInspector) throws HiveException {
return new VectorExpressionWriterBytes() {
private Object obj;
private Text text;
- public VectorExpressionWriter init(SettableHiveVarcharObjectInspector objInspector)
+ public VectorExpressionWriter init(SettableHiveCharObjectInspector objInspector)
throws HiveException {
super.init(objInspector);
this.text = new Text();
@@ -575,7 +580,7 @@ public final class VectorExpressionWrite
@Override
public Object writeValue(byte[] value, int start, int length) throws HiveException {
text.set(value, start, length);
- ((SettableHiveVarcharObjectInspector) this.objectInspector).set(this.obj, text.toString());
+ ((SettableHiveCharObjectInspector) this.objectInspector).set(this.obj, text.toString());
return this.obj;
}
@@ -586,12 +591,52 @@ public final class VectorExpressionWrite
field = initValue(null);
}
text.set(value, start, length);
- ((SettableHiveVarcharObjectInspector) this.objectInspector).set(field, text.toString());
+ ((SettableHiveCharObjectInspector) this.objectInspector).set(field, text.toString());
return field;
}
@Override
public Object initValue(Object ignored) {
+ return ((SettableHiveCharObjectInspector) this.objectInspector)
+ .create(new HiveChar(StringUtils.EMPTY, -1));
+ }
+ }.init(fieldObjInspector);
+ }
+
+ private static VectorExpressionWriter genVectorExpressionWritableVarchar(
+ SettableHiveVarcharObjectInspector fieldObjInspector) throws HiveException {
+ return new VectorExpressionWriterBytes() {
+ private Object obj;
+ private Text text;
+
+ public VectorExpressionWriter init(SettableHiveVarcharObjectInspector objInspector)
+ throws HiveException {
+ super.init(objInspector);
+ this.text = new Text();
+ this.obj = initValue(null);
+ return this;
+ }
+
+ @Override
+ public Object writeValue(byte[] value, int start, int length) throws HiveException {
+ text.set(value, start, length);
+ ((SettableHiveVarcharObjectInspector) this.objectInspector).set(this.obj, text.toString());
+ return this.obj;
+ }
+
+ @Override
+ public Object setValue(Object field, byte[] value, int start, int length)
+ throws HiveException {
+ if (null == field) {
+ field = initValue(null);
+ }
+ text.set(value, start, length);
+ ((SettableHiveVarcharObjectInspector) this.objectInspector).set(field, text.toString());
+ return field;
+ }
+
+ @Override
+ public Object initValue(Object ignored) {
return ((SettableHiveVarcharObjectInspector) this.objectInspector)
.create(new HiveVarchar(StringUtils.EMPTY, -1));
}
@@ -603,24 +648,24 @@ public final class VectorExpressionWrite
return new VectorExpressionWriterBytes() {
private Object obj;
private Text text;
-
- public VectorExpressionWriter init(SettableStringObjectInspector objInspector)
+
+ public VectorExpressionWriter init(SettableStringObjectInspector objInspector)
throws HiveException {
super.init(objInspector);
this.text = new Text();
this.obj = initValue(null);
return this;
}
-
+
@Override
public Object writeValue(byte[] value, int start, int length) throws HiveException {
this.text.set(value, start, length);
((SettableStringObjectInspector) this.objectInspector).set(this.obj, this.text.toString());
return this.obj;
}
-
+
@Override
- public Object setValue(Object field, byte[] value, int start, int length)
+ public Object setValue(Object field, byte[] value, int start, int length)
throws HiveException {
if (null == field) {
field = initValue(null);
@@ -628,12 +673,12 @@ public final class VectorExpressionWrite
this.text.set(value, start, length);
((SettableStringObjectInspector) this.objectInspector).set(field, this.text.toString());
return field;
- }
-
+ }
+
@Override
public Object initValue(Object ignored) {
return ((SettableStringObjectInspector) this.objectInspector).create(StringUtils.EMPTY);
- }
+ }
}.init(fieldObjInspector);
}
@@ -642,22 +687,22 @@ public final class VectorExpressionWrite
return new VectorExpressionWriterBytes() {
private Object obj;
private byte[] bytes;
-
- public VectorExpressionWriter init(SettableBinaryObjectInspector objInspector)
+
+ public VectorExpressionWriter init(SettableBinaryObjectInspector objInspector)
throws HiveException {
super.init(objInspector);
this.bytes = ArrayUtils.EMPTY_BYTE_ARRAY;
this.obj = initValue(null);
return this;
}
-
+
@Override
public Object writeValue(byte[] value, int start, int length) throws HiveException {
bytes = Arrays.copyOfRange(value, start, start + length);
((SettableBinaryObjectInspector) this.objectInspector).set(this.obj, bytes);
return this.obj;
}
-
+
@Override
public Object setValue(Object field, byte[] value, int start, int length) throws HiveException {
if (null == field) {
@@ -666,7 +711,7 @@ public final class VectorExpressionWrite
bytes = Arrays.copyOfRange(value, start, start + length);
((SettableBinaryObjectInspector) this.objectInspector).set(field, bytes);
return field;
- }
+ }
@Override
public Object initValue(Object ignored) {
@@ -680,20 +725,20 @@ public final class VectorExpressionWrite
SettableLongObjectInspector fieldObjInspector) throws HiveException {
return new VectorExpressionWriterLong() {
private Object obj;
-
- public VectorExpressionWriter init(SettableLongObjectInspector objInspector)
+
+ public VectorExpressionWriter init(SettableLongObjectInspector objInspector)
throws HiveException {
super.init(objInspector);
this.obj = initValue(null);
return this;
}
-
+
@Override
public Object writeValue(long value) throws HiveException {
((SettableLongObjectInspector) this.objectInspector).set(this.obj, value);
return this.obj;
}
-
+
@Override
public Object setValue(Object field, long value) throws HiveException {
if (null == field) {
@@ -712,56 +757,55 @@ public final class VectorExpressionWrite
}
private static VectorExpressionWriter genVectorExpressionWritableVoid(
- VoidObjectInspector fieldObjInspector) throws HiveException {
- return new VectorExpressionWriterLong() {
- private Object obj;
-
- public VectorExpressionWriter init(VoidObjectInspector objInspector)
- throws HiveException {
- super.init(objInspector);
- this.obj = initValue(null);
- return this;
- }
-
- @Override
- public Object writeValue(long value) throws HiveException {
- return this.obj;
- }
-
- @Override
- public Object setValue(Object field, long value) throws HiveException {
- if (null == field) {
- field = initValue(null);
- }
- return field;
- }
-
- @Override
- public Object initValue(Object ignored) {
- return ((VoidObjectInspector) this.objectInspector).copyObject(null);
- }
- }.init(fieldObjInspector);
- }
-
-
+ VoidObjectInspector fieldObjInspector) throws HiveException {
+ return new VectorExpressionWriterLong() {
+ private Object obj;
+
+ public VectorExpressionWriter init(VoidObjectInspector objInspector) throws HiveException {
+ super.init(objInspector);
+ this.obj = initValue(null);
+ return this;
+ }
+
+ @Override
+ public Object writeValue(long value) throws HiveException {
+ return this.obj;
+ }
+
+ @Override
+ public Object setValue(Object field, long value) throws HiveException {
+ if (null == field) {
+ field = initValue(null);
+ }
+ return field;
+ }
+
+ @Override
+ public Object initValue(Object ignored) {
+ return ((VoidObjectInspector) this.objectInspector).copyObject(null);
+ }
+ }.init(fieldObjInspector);
+ }
+
+
private static VectorExpressionWriter genVectorExpressionWritableInt(
SettableIntObjectInspector fieldObjInspector) throws HiveException {
return new VectorExpressionWriterLong() {
private Object obj;
-
- public VectorExpressionWriter init(SettableIntObjectInspector objInspector)
+
+ public VectorExpressionWriter init(SettableIntObjectInspector objInspector)
throws HiveException {
super.init(objInspector);
this.obj = initValue(null);
return this;
}
-
+
@Override
public Object writeValue(long value) throws HiveException {
((SettableIntObjectInspector) this.objectInspector).set(this.obj, (int) value);
return this.obj;
}
-
+
@Override
public Object setValue(Object field, long value) throws HiveException {
if (null == field) {
@@ -770,7 +814,7 @@ public final class VectorExpressionWrite
((SettableIntObjectInspector) this.objectInspector).set(field, (int) value);
return field;
}
-
+
@Override
public Object initValue(Object ignored) {
return ((SettableIntObjectInspector) this.objectInspector)
@@ -783,20 +827,20 @@ public final class VectorExpressionWrite
SettableShortObjectInspector fieldObjInspector) throws HiveException {
return new VectorExpressionWriterLong() {
private Object obj;
-
- public VectorExpressionWriter init(SettableShortObjectInspector objInspector)
+
+ public VectorExpressionWriter init(SettableShortObjectInspector objInspector)
throws HiveException {
super.init(objInspector);
this.obj = initValue(null);
return this;
}
-
+
@Override
public Object writeValue(long value) throws HiveException {
((SettableShortObjectInspector) this.objectInspector).set(this.obj, (short) value);
return this.obj;
}
-
+
@Override
public Object setValue(Object field, long value) throws HiveException {
if (null == field) {
@@ -805,7 +849,7 @@ public final class VectorExpressionWrite
((SettableShortObjectInspector) this.objectInspector).set(field, (short) value);
return field;
}
-
+
@Override
public Object initValue(Object ignored) {
return ((SettableShortObjectInspector) this.objectInspector)
@@ -818,20 +862,20 @@ public final class VectorExpressionWrite
SettableByteObjectInspector fieldObjInspector) throws HiveException {
return new VectorExpressionWriterLong() {
private Object obj;
-
- public VectorExpressionWriter init(SettableByteObjectInspector objInspector)
+
+ public VectorExpressionWriter init(SettableByteObjectInspector objInspector)
throws HiveException {
super.init(objInspector);
this.obj = initValue(null);
return this;
}
-
+
@Override
public Object writeValue(long value) throws HiveException {
((SettableByteObjectInspector) this.objectInspector).set(this.obj, (byte) value);
return this.obj;
}
-
+
@Override
public Object setValue(Object field, long value) throws HiveException {
if (null == field) {
@@ -840,7 +884,7 @@ public final class VectorExpressionWrite
((SettableByteObjectInspector) this.objectInspector).set(field, (byte) value);
return field;
}
-
+
@Override
public Object initValue(Object ignored) {
return ((SettableByteObjectInspector) this.objectInspector)
@@ -853,31 +897,31 @@ public final class VectorExpressionWrite
SettableBooleanObjectInspector fieldObjInspector) throws HiveException {
return new VectorExpressionWriterLong() {
private Object obj;
-
- public VectorExpressionWriter init(SettableBooleanObjectInspector objInspector)
+
+ public VectorExpressionWriter init(SettableBooleanObjectInspector objInspector)
throws HiveException {
super.init(objInspector);
this.obj = initValue(null);
return this;
}
-
+
@Override
public Object writeValue(long value) throws HiveException {
- ((SettableBooleanObjectInspector) this.objectInspector).set(this.obj,
+ ((SettableBooleanObjectInspector) this.objectInspector).set(this.obj,
value == 0 ? false : true);
return this.obj;
}
-
+
@Override
public Object setValue(Object field, long value) throws HiveException {
if (null == field) {
field = initValue(null);
}
- ((SettableBooleanObjectInspector) this.objectInspector).set(field,
+ ((SettableBooleanObjectInspector) this.objectInspector).set(field,
value == 0 ? false : true);
return field;
}
-
+
@Override
public Object initValue(Object ignored) {
return ((SettableBooleanObjectInspector) this.objectInspector)
@@ -890,20 +934,20 @@ public final class VectorExpressionWrite
SettableDoubleObjectInspector fieldObjInspector) throws HiveException {
return new VectorExpressionWriterDouble() {
private Object obj;
-
- public VectorExpressionWriter init(SettableDoubleObjectInspector objInspector)
+
+ public VectorExpressionWriter init(SettableDoubleObjectInspector objInspector)
throws HiveException {
super.init(objInspector);
this.obj = initValue(null);
return this;
}
-
+
@Override
public Object writeValue(double value) throws HiveException {
((SettableDoubleObjectInspector) this.objectInspector).set(this.obj, value);
return this.obj;
}
-
+
@Override
public Object setValue(Object field, double value) throws HiveException {
if (null == field) {
@@ -911,8 +955,8 @@ public final class VectorExpressionWrite
}
((SettableDoubleObjectInspector) this.objectInspector).set(field, value);
return field;
- }
-
+ }
+
@Override
public Object initValue(Object ignored) {
return ((SettableDoubleObjectInspector) this.objectInspector)
@@ -925,20 +969,20 @@ public final class VectorExpressionWrite
SettableFloatObjectInspector fieldObjInspector) throws HiveException {
return new VectorExpressionWriterDouble() {
private Object obj;
-
- public VectorExpressionWriter init(SettableFloatObjectInspector objInspector)
+
+ public VectorExpressionWriter init(SettableFloatObjectInspector objInspector)
throws HiveException {
super.init(objInspector);
this.obj = initValue(null);
return this;
}
-
+
@Override
public Object writeValue(double value) throws HiveException {
((SettableFloatObjectInspector) this.objectInspector).set(this.obj, (float) value);
return this.obj;
}
-
+
@Override
public Object setValue(Object field, double value) throws HiveException {
if (null == field) {
@@ -947,7 +991,7 @@ public final class VectorExpressionWrite
((SettableFloatObjectInspector) this.objectInspector).set(field, (float) value);
return field;
}
-
+
@Override
public Object initValue(Object ignored) {
return ((SettableFloatObjectInspector) this.objectInspector)
@@ -1027,25 +1071,25 @@ public final class VectorExpressionWrite
*/
public static VectorExpressionWriter[] getExpressionWriters(StructObjectInspector objInspector)
throws HiveException {
-
+
if (objInspector.isSettable()) {
return getSettableExpressionWriters((SettableStructObjectInspector) objInspector);
}
-
+
List<? extends StructField> allFieldRefs = objInspector.getAllStructFieldRefs();
-
+
VectorExpressionWriter[] expressionWriters = new VectorExpressionWriter[allFieldRefs.size()];
-
+
for(int i=0; i<expressionWriters.length; ++i) {
expressionWriters[i] = genVectorExpressionWritable(allFieldRefs.get(i).getFieldObjectInspector());
}
-
+
return expressionWriters;
}
public static VectorExpressionWriter[] getSettableExpressionWriters(
SettableStructObjectInspector objInspector) throws HiveException {
- List<? extends StructField> fieldsRef = objInspector.getAllStructFieldRefs();
+ List<? extends StructField> fieldsRef = objInspector.getAllStructFieldRefs();
VectorExpressionWriter[] writers = new VectorExpressionWriter[fieldsRef.size()];
for(int i=0; i<writers.length; ++i) {
StructField fieldRef = fieldsRef.get(i);
@@ -1054,19 +1098,19 @@ public final class VectorExpressionWrite
writers[i] = genVectorExpressionWritable(objInspector, fieldRef, baseWriter);
}
return writers;
-
+
}
-
+
/**
- * VectorExpressionWriterSetter helper for vector expression writers that use
+ * VectorExpressionWriterSetter helper for vector expression writers that use
* settable ObjectInspector fields to assign the values.
- * This is used by the OrcStruct serialization (eg. CREATE TABLE ... AS ...)
+ * This is used by the OrcStruct serialization (eg. CREATE TABLE ... AS ...)
*/
private static class VectorExpressionWriterSetter extends VectorExpressionWriterBase {
private SettableStructObjectInspector settableObjInspector;
private StructField fieldRef;
private VectorExpressionWriter baseWriter;
-
+
public VectorExpressionWriterSetter init(
SettableStructObjectInspector objInspector,
StructField fieldRef,
@@ -1087,15 +1131,15 @@ public final class VectorExpressionWrite
@Override
public Object setValue(Object row, ColumnVector column, int columnRow)
throws HiveException {
-
+
// NULLs are handled by each individual base writer setter
// We could handle NULLs centrally here but that would result in spurious allocs
-
+
Object fieldValue = this.settableObjInspector.getStructFieldData(row, fieldRef);
fieldValue = baseWriter.setValue(fieldValue, column, columnRow);
return this.settableObjInspector.setStructFieldData(row, fieldRef, fieldValue);
}
-
+
@Override
public Object initValue(Object struct) throws HiveException {
Object initValue = this.baseWriter.initValue(null);
@@ -1103,7 +1147,7 @@ public final class VectorExpressionWrite
return struct;
}
}
-
+
private static VectorExpressionWriter genVectorExpressionWritable(
SettableStructObjectInspector objInspector,
StructField fieldRef,
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddColCol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddColCol.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddColCol.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddColCol.java Mon Sep 8 04:38:17 2014
@@ -127,6 +127,8 @@ public class VectorUDFDateAddColCol exte
break;
case STRING:
+ case CHAR:
+ case VARCHAR:
// Now disregard null in second pass.
if ((inputColVector1.isRepeating) && (inputColVector2.isRepeating)) {
// All must be selected otherwise size would be zero
@@ -144,6 +146,8 @@ public class VectorUDFDateAddColCol exte
}
}
break;
+ default:
+ throw new Error("Unsupported input type " + inputTypes[0].name());
}
}
@@ -235,8 +239,8 @@ public class VectorUDFDateAddColCol exte
b.setMode(VectorExpressionDescriptor.Mode.PROJECTION)
.setNumArguments(2)
.setArgumentTypes(
- VectorExpressionDescriptor.ArgumentType.ANY,
- VectorExpressionDescriptor.ArgumentType.LONG)
+ VectorExpressionDescriptor.ArgumentType.STRING_DATETIME_FAMILY,
+ VectorExpressionDescriptor.ArgumentType.INT_FAMILY)
.setInputExpressionTypes(
VectorExpressionDescriptor.InputExpressionType.COLUMN,
VectorExpressionDescriptor.InputExpressionType.COLUMN);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddColScalar.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddColScalar.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddColScalar.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddColScalar.java Mon Sep 8 04:38:17 2014
@@ -165,6 +165,8 @@ public class VectorUDFDateAddColScalar e
break;
case STRING:
+ case CHAR:
+ case VARCHAR:
if (inputCol.noNulls) {
outV.noNulls = true;
if (batch.selectedInUse) {
@@ -199,6 +201,8 @@ public class VectorUDFDateAddColScalar e
}
}
break;
+ default:
+ throw new Error("Unsupported input type " + inputTypes[0].name());
}
}
@@ -286,8 +290,8 @@ public class VectorUDFDateAddColScalar e
b.setMode(VectorExpressionDescriptor.Mode.PROJECTION)
.setNumArguments(2)
.setArgumentTypes(
- VectorExpressionDescriptor.ArgumentType.ANY,
- VectorExpressionDescriptor.ArgumentType.LONG)
+ VectorExpressionDescriptor.ArgumentType.STRING_DATETIME_FAMILY,
+ VectorExpressionDescriptor.ArgumentType.INT_FAMILY)
.setInputExpressionTypes(
VectorExpressionDescriptor.InputExpressionType.COLUMN,
VectorExpressionDescriptor.InputExpressionType.SCALAR);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddScalarCol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddScalarCol.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddScalarCol.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateAddScalarCol.java Mon Sep 8 04:38:17 2014
@@ -84,6 +84,8 @@ public class VectorUDFDateAddScalarCol e
break;
case STRING:
+ case CHAR:
+ case VARCHAR:
try {
baseDate = formatter.parse(new String(stringValue, "UTF-8"));
break;
@@ -101,6 +103,8 @@ public class VectorUDFDateAddScalarCol e
}
return;
}
+ default:
+ throw new Error("Unsupported input type " + inputTypes[0].name());
}
if(batch.size == 0) {
@@ -213,8 +217,8 @@ public class VectorUDFDateAddScalarCol e
b.setMode(VectorExpressionDescriptor.Mode.PROJECTION)
.setNumArguments(2)
.setArgumentTypes(
- VectorExpressionDescriptor.ArgumentType.ANY,
- VectorExpressionDescriptor.ArgumentType.LONG)
+ VectorExpressionDescriptor.ArgumentType.STRING_DATETIME_FAMILY,
+ VectorExpressionDescriptor.ArgumentType.INT_FAMILY)
.setInputExpressionTypes(
VectorExpressionDescriptor.InputExpressionType.SCALAR,
VectorExpressionDescriptor.InputExpressionType.COLUMN);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffColCol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffColCol.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffColCol.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffColCol.java Mon Sep 8 04:38:17 2014
@@ -191,12 +191,14 @@ public class VectorUDFDateDiffColCol ext
return dateVector;
case STRING:
+ case CHAR:
+ case VARCHAR:
BytesColumnVector bcv = (BytesColumnVector) inputColVector;
copySelected(bcv, batch.selectedInUse, batch.selected, batch.size, dateVector);
return dateVector;
+ default:
+ throw new Error("Unsupported input type " + colType.name());
}
-
- return null;
}
// Copy the current object contents into the output. Only copy selected entries,
@@ -314,8 +316,8 @@ public class VectorUDFDateDiffColCol ext
b.setMode(VectorExpressionDescriptor.Mode.PROJECTION)
.setNumArguments(2)
.setArgumentTypes(
- VectorExpressionDescriptor.ArgumentType.ANY,
- VectorExpressionDescriptor.ArgumentType.ANY)
+ VectorExpressionDescriptor.ArgumentType.STRING_DATETIME_FAMILY,
+ VectorExpressionDescriptor.ArgumentType.STRING_DATETIME_FAMILY)
.setInputExpressionTypes(
VectorExpressionDescriptor.InputExpressionType.COLUMN,
VectorExpressionDescriptor.InputExpressionType.COLUMN);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffColScalar.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffColScalar.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffColScalar.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffColScalar.java Mon Sep 8 04:38:17 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.vector.expressions;
+import org.apache.hadoop.hive.metastore.parser.ExpressionTree.Operator;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
@@ -90,6 +91,8 @@ public class VectorUDFDateDiffColScalar
break;
case STRING:
+ case CHAR:
+ case VARCHAR:
try {
date.setTime(formatter.parse(new String(stringValue, "UTF-8")).getTime());
baseDate = DateWritable.dateToDays(date);
@@ -108,6 +111,8 @@ public class VectorUDFDateDiffColScalar
}
return;
}
+ default:
+ throw new Error("Invalid input type #1: " + inputTypes[1].name());
}
switch (inputTypes[0]) {
@@ -184,6 +189,8 @@ public class VectorUDFDateDiffColScalar
break;
case STRING:
+ case CHAR:
+ case VARCHAR:
if (inputCol.noNulls) {
outV.noNulls = true;
if (batch.selectedInUse) {
@@ -218,6 +225,8 @@ public class VectorUDFDateDiffColScalar
}
}
break;
+ default:
+ throw new Error("Invalid input type #0: " + inputTypes[0].name());
}
}
@@ -287,8 +296,8 @@ public class VectorUDFDateDiffColScalar
b.setMode(VectorExpressionDescriptor.Mode.PROJECTION)
.setNumArguments(2)
.setArgumentTypes(
- VectorExpressionDescriptor.ArgumentType.ANY,
- VectorExpressionDescriptor.ArgumentType.ANY)
+ VectorExpressionDescriptor.ArgumentType.STRING_DATETIME_FAMILY,
+ VectorExpressionDescriptor.ArgumentType.STRING_DATETIME_FAMILY)
.setInputExpressionTypes(
VectorExpressionDescriptor.InputExpressionType.COLUMN,
VectorExpressionDescriptor.InputExpressionType.SCALAR);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffScalarCol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffScalarCol.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffScalarCol.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateDiffScalarCol.java Mon Sep 8 04:38:17 2014
@@ -90,6 +90,8 @@ public class VectorUDFDateDiffScalarCol
break;
case STRING:
+ case CHAR:
+ case VARCHAR:
try {
date.setTime(formatter.parse(new String(stringValue, "UTF-8")).getTime());
baseDate = DateWritable.dateToDays(date);
@@ -108,6 +110,8 @@ public class VectorUDFDateDiffScalarCol
}
return;
}
+ default:
+ throw new Error("Unsupported input type " + inputTypes[0].name());
}
switch (inputTypes[1]) {
@@ -184,6 +188,8 @@ public class VectorUDFDateDiffScalarCol
break;
case STRING:
+ case CHAR:
+ case VARCHAR:
if (inputCol.noNulls) {
outV.noNulls = true;
if (batch.selectedInUse) {
@@ -218,6 +224,8 @@ public class VectorUDFDateDiffScalarCol
}
}
break;
+ default:
+ throw new Error("Unsupported input type " + inputTypes[1].name());
}
}
@@ -287,8 +295,8 @@ public class VectorUDFDateDiffScalarCol
b.setMode(VectorExpressionDescriptor.Mode.PROJECTION)
.setNumArguments(2)
.setArgumentTypes(
- VectorExpressionDescriptor.ArgumentType.ANY,
- VectorExpressionDescriptor.ArgumentType.ANY)
+ VectorExpressionDescriptor.ArgumentType.STRING_DATETIME_FAMILY,
+ VectorExpressionDescriptor.ArgumentType.STRING_DATETIME_FAMILY)
.setInputExpressionTypes(
VectorExpressionDescriptor.InputExpressionType.SCALAR,
VectorExpressionDescriptor.InputExpressionType.COLUMN);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateLong.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateLong.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateLong.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFDateLong.java Mon Sep 8 04:38:17 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec.vector.expressions;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import java.io.UnsupportedEncodingException;
@@ -49,6 +50,8 @@ public class VectorUDFDateLong extends L
case TIMESTAMP:
date.setTime(vector[i] / 1000000);
break;
+ default:
+ throw new Error("Unsupported input type " + inputTypes[0].name());
}
try {
byte[] bytes = formatter.format(date).getBytes("UTF-8");
@@ -58,4 +61,16 @@ public class VectorUDFDateLong extends L
outV.isNull[i] = true;
}
}
+
+ @Override
+ public VectorExpressionDescriptor.Descriptor getDescriptor() {
+ VectorExpressionDescriptor.Builder b = new VectorExpressionDescriptor.Builder();
+ b.setMode(VectorExpressionDescriptor.Mode.PROJECTION)
+ .setNumArguments(1)
+ .setArgumentTypes(
+ VectorExpressionDescriptor.ArgumentType.DATETIME_FAMILY)
+ .setInputExpressionTypes(
+ VectorExpressionDescriptor.InputExpressionType.COLUMN);
+ return b.build();
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldLong.java Mon Sep 8 04:38:17 2014
@@ -179,6 +179,8 @@ public abstract class VectorUDFTimestamp
}
}
break;
+ default:
+ throw new Error("Unsupported input type " + inputTypes[0].name());
}
}
@@ -218,7 +220,7 @@ public abstract class VectorUDFTimestamp
b.setMode(VectorExpressionDescriptor.Mode.PROJECTION)
.setNumArguments(1)
.setArgumentTypes(
- VectorExpressionDescriptor.ArgumentType.LONG)
+ VectorExpressionDescriptor.ArgumentType.DATETIME_FAMILY)
.setInputExpressionTypes(
VectorExpressionDescriptor.InputExpressionType.COLUMN);
return b.build();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldString.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldString.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldString.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorUDFTimestampFieldString.java Mon Sep 8 04:38:17 2014
@@ -181,7 +181,7 @@ public abstract class VectorUDFTimestamp
b.setMode(VectorExpressionDescriptor.Mode.PROJECTION)
.setNumArguments(1)
.setArgumentTypes(
- VectorExpressionDescriptor.ArgumentType.STRING)
+ VectorExpressionDescriptor.ArgumentType.STRING_FAMILY)
.setInputExpressionTypes(
VectorExpressionDescriptor.InputExpressionType.COLUMN);
return b.build();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java Mon Sep 8 04:38:17 2014
@@ -23,6 +23,7 @@ import java.sql.Timestamp;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.vector.*;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -30,8 +31,12 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hadoop.io.Text;
/**
@@ -215,6 +220,35 @@ public class VectorUDFAdaptor extends Ve
t = ((WritableStringObjectInspector) outputOI).getPrimitiveWritableObject(value);
}
bv.setVal(i, t.getBytes(), 0, t.getLength());
+ } else if (outputOI instanceof WritableHiveCharObjectInspector) {
+ WritableHiveCharObjectInspector writableHiveCharObjectOI = (WritableHiveCharObjectInspector) outputOI;
+ int maxLength = ((CharTypeInfo) writableHiveCharObjectOI.getTypeInfo()).getLength();
+ BytesColumnVector bv = (BytesColumnVector) colVec;
+
+ HiveCharWritable hiveCharWritable;
+ if (value instanceof HiveCharWritable) {
+ hiveCharWritable = ((HiveCharWritable) value);
+ } else {
+ hiveCharWritable = writableHiveCharObjectOI.getPrimitiveWritableObject(value);
+ }
+ Text t = hiveCharWritable.getTextValue();
+
+ // In vector mode, we stored CHAR as unpadded.
+ StringExpr.rightTrimAndTruncate(bv, i, t.getBytes(), 0, t.getLength(), maxLength);
+ } else if (outputOI instanceof WritableHiveVarcharObjectInspector) {
+ WritableHiveVarcharObjectInspector writableHiveVarcharObjectOI = (WritableHiveVarcharObjectInspector) outputOI;
+ int maxLength = ((VarcharTypeInfo) writableHiveVarcharObjectOI.getTypeInfo()).getLength();
+ BytesColumnVector bv = (BytesColumnVector) colVec;
+
+ HiveVarcharWritable hiveVarcharWritable;
+ if (value instanceof HiveVarcharWritable) {
+ hiveVarcharWritable = ((HiveVarcharWritable) value);
+ } else {
+ hiveVarcharWritable = writableHiveVarcharObjectOI.getPrimitiveWritableObject(value);
+ }
+ Text t = hiveVarcharWritable.getTextValue();
+
+ StringExpr.truncate(bv, i, t.getBytes(), 0, t.getLength(), maxLength);
} else if (outputOI instanceof WritableIntObjectInspector) {
LongColumnVector lv = (LongColumnVector) colVec;
if (value instanceof Integer) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookUtils.java Mon Sep 8 04:38:17 2014
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.exec.Utilities;
public class HookUtils {
/**
@@ -57,7 +58,7 @@ public class HookUtils {
String[] hookClasses = csHooks.split(",");
for (String hookClass : hookClasses) {
T hook = (T) Class.forName(hookClass.trim(), true,
- JavaUtils.getClassLoader()).newInstance();
+ Utilities.getSessionSpecifiedClassLoader()).newInstance();
hooks.add(hook);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java Mon Sep 8 04:38:17 2014
@@ -52,6 +52,7 @@ public interface AcidOutputFormat<K exte
private int bucket;
private PrintStream dummyStream = null;
private boolean oldStyle = false;
+ private int recIdCol = -1; // Column the record identifier is in, -1 indicates no record id
/**
* Create the options object.
@@ -164,6 +165,16 @@ public interface AcidOutputFormat<K exte
}
/**
+ * Which column the row id field is in.
+ * @param recIdCol
+ * @return this
+ */
+ public Options recordIdColumn(int recIdCol) {
+ this.recIdCol = recIdCol;
+ return this;
+ }
+
+ /**
* Temporary switch while we are in development that replaces the
* implementation with a dummy one that just prints to stream.
* @param stream the stream to print to
@@ -214,6 +225,10 @@ public interface AcidOutputFormat<K exte
return bucket;
}
+ public int getRecordIdColumn() {
+ return recIdCol;
+ }
+
public PrintStream getDummyStream() {
return dummyStream;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Mon Sep 8 04:38:17 2014
@@ -164,6 +164,8 @@ public class AcidUtils {
return result;
}
+ public enum Operation { NOT_ACID, INSERT, UPDATE, DELETE }
+
public static interface Directory {
/**
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java Mon Sep 8 04:38:17 2014
@@ -18,14 +18,13 @@
package org.apache.hadoop.hive.ql.io;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.lib.HashPartitioner;
/** Partition keys by their {@link Object#hashCode()}. */
public class DefaultHivePartitioner<K2, V2> extends HashPartitioner<K2, V2> implements HivePartitioner<K2, V2> {
/** Use {@link Object#hashCode()} to partition. */
+ @Override
public int getBucket(K2 key, V2 value, int numBuckets) {
return (key.hashCode() & Integer.MAX_VALUE) % numBuckets;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Mon Sep 8 04:38:17 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -249,21 +250,8 @@ public final class HiveFileFormatUtils {
public static RecordWriter getHiveRecordWriter(JobConf jc,
TableDesc tableInfo, Class<? extends Writable> outputClass,
FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
- boolean storagehandlerofhivepassthru = false;
- HiveOutputFormat<?, ?> hiveOutputFormat;
+ HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
try {
- if (tableInfo.getJobProperties() != null) {
- if (tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
- jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY));
- storagehandlerofhivepassthru = true;
- }
- }
- if (storagehandlerofhivepassthru) {
- hiveOutputFormat = ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(),jc);
- }
- else {
- hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance();
- }
boolean isCompressed = conf.getCompressed();
JobConf jc_output = jc;
if (isCompressed) {
@@ -299,6 +287,73 @@ public final class HiveFileFormatUtils {
return null;
}
+ private static HiveOutputFormat<?, ?> getHiveOutputFormat(JobConf jc, TableDesc tableInfo)
+ throws HiveException {
+ boolean storagehandlerofhivepassthru = false;
+ HiveOutputFormat<?, ?> hiveOutputFormat;
+ try {
+ if (tableInfo.getJobProperties() != null) {
+ if (tableInfo.getJobProperties().get(
+ HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
+ jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,
+ tableInfo.getJobProperties()
+ .get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY));
+ storagehandlerofhivepassthru = true;
+ }
+ }
+ if (storagehandlerofhivepassthru) {
+ return ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(), jc);
+ } else {
+ return tableInfo.getOutputFileFormatClass().newInstance();
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo, int bucket,
+ FileSinkDesc conf, Path outPath,
+ ObjectInspector inspector,
+ Reporter reporter, int rowIdColNum)
+ throws HiveException, IOException {
+ HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
+ AcidOutputFormat<?, ?> acidOutputFormat = null;
+ if (hiveOutputFormat instanceof AcidOutputFormat) {
+ acidOutputFormat = (AcidOutputFormat)hiveOutputFormat;
+ } else {
+ throw new HiveException("Unable to create RecordUpdater for HiveOutputFormat that does not " +
+ "implement AcidOutputFormat");
+ }
+ // TODO not 100% sure about this. This call doesn't set the compression type in the conf
+ // file the way getHiveRecordWriter does, as ORC appears to read the value for itself. Not
+ // sure if this is correct or not.
+ return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(),
+ bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum);
+ }
+
+
+ private static RecordUpdater getRecordUpdater(JobConf jc,
+ AcidOutputFormat<?, ?> acidOutputFormat,
+ boolean isCompressed,
+ long txnId,
+ int bucket,
+ ObjectInspector inspector,
+ Properties tableProp,
+ Path outPath,
+ Reporter reporter,
+ int rowIdColNum) throws IOException {
+ return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc)
+ .isCompressed(isCompressed)
+ .tableProperties(tableProp)
+ .reporter(reporter)
+ .writingBase(false)
+ .minimumTransactionId(txnId)
+ .maximumTransactionId(txnId)
+ .bucket(bucket)
+ .inspector(inspector)
+ .recordIdColumn(rowIdColNum));
+ }
+
public static PartitionDesc getPartitionDescFromPathRecursively(
Map<String, PartitionDesc> pathToPartitionInfo, Path dir,
Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cacheMap)
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java Mon Sep 8 04:38:17 2014
@@ -78,7 +78,7 @@ public class HiveIgnoreKeyTextOutputForm
final int finalRowSeparator = rowSeparator;
FileSystem fs = outPath.getFileSystem(jc);
final OutputStream outStream = Utilities.createCompressedStream(jc,
- fs.create(outPath, progress), isCompressed);
+ fs.create(outPath, progress), isCompressed);
return new RecordWriter() {
@Override
public void write(Writable r) throws IOException {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java Mon Sep 8 04:38:17 2014
@@ -54,7 +54,7 @@ public class HiveNullValueSequenceFileOu
FileSystem fs = finalOutPath.getFileSystem(jc);
final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc, fs, finalOutPath,
- HiveKey.class, NullWritable.class, isCompressed, progress);
+ HiveKey.class, NullWritable.class, isCompressed, progress);
keyWritable = new HiveKey();
keyIsText = valueClass.equals(Text.class);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java Mon Sep 8 04:38:17 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
@@ -65,7 +66,7 @@ public class HivePassThroughOutputFormat
{
cls =
(Class<? extends OutputFormat>) Class.forName(actualOutputFormatClass, true,
- JavaUtils.getClassLoader());
+ Utilities.getSessionSpecifiedClassLoader());
} else {
throw new RuntimeException("Null pointer detected in actualOutputFormatClass");
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java Mon Sep 8 04:38:17 2014
@@ -62,7 +62,7 @@ public class HiveSequenceFileOutputForma
FileSystem fs = finalOutPath.getFileSystem(jc);
final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc, fs, finalOutPath,
- BytesWritable.class, valueClass, isCompressed, progress);
+ BytesWritable.class, valueClass, isCompressed, progress);
return new RecordWriter() {
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java Mon Sep 8 04:38:17 2014
@@ -133,7 +133,7 @@ public class RCFileOutputFormat extends
RCFileOutputFormat.setColumnNumber(jc, cols.length);
final RCFile.Writer outWriter = Utilities.createRCFileWriter(jc,
- finalOutPath.getFileSystem(jc), finalOutPath, isCompressed, progress);
+ finalOutPath.getFileSystem(jc), finalOutPath, isCompressed, progress);
return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java Mon Sep 8 04:38:17 2014
@@ -64,7 +64,7 @@ public class RCFileRecordReader<K extend
private final Map<String, RCFileSyncEntry> cache;
public RCFileSyncCache() {
- cache = Collections.synchronizedMap(new WeakHashMap<String, RCFileSyncEntry>());
+ cache = Collections.synchronizedMap(new WeakHashMap<String, RCFileSyncEntry>());
}
public void put(FileSplit split, long endSync) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java Mon Sep 8 04:38:17 2014
@@ -40,26 +40,17 @@ public interface RecordUpdater {
/**
* Update an old record with a new set of values.
* @param currentTransaction the current transaction id
- * @param originalTransaction the row's original transaction id
- * @param rowId the original row id
* @param row the new values for the row
* @throws IOException
*/
- void update(long currentTransaction,
- long originalTransaction,
- long rowId,
- Object row) throws IOException;
+ void update(long currentTransaction, Object row) throws IOException;
/**
* Delete a row from the table.
* @param currentTransaction the current transaction id
- * @param originalTransaction the rows original transaction id
- * @param rowId the original row id
* @throws IOException
*/
- void delete(long currentTransaction,
- long originalTransaction,
- long rowId) throws IOException;
+ void delete(long currentTransaction, Object row) throws IOException;
/**
* Flush the current set of rows to the underlying file system, so that
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeTask.java Mon Sep 8 04:38:17 2014
@@ -373,6 +373,10 @@ public class MergeTask extends Task<Merg
}
}
+ if (format == null || format.trim().equals("")) {
+ printUsage();
+ }
+
MergeWork mergeWork = null;
if (format.equals("rcfile")) {
mergeWork = new MergeWork(inputPaths, new Path(outputDir), RCFileInputFormat.class);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Mon Sep 8 04:38:17 2014
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.io.Reco
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -279,10 +280,10 @@ public class OrcInputFormat implements
} else {
SearchArgument sarg;
if (serializedPushdown != null) {
- sarg = SearchArgument.FACTORY.create
+ sarg = SearchArgumentFactory.create
(Utilities.deserializeExpression(serializedPushdown));
} else {
- sarg = SearchArgument.FACTORY.create(sargPushdown);
+ sarg = SearchArgumentFactory.create(sargPushdown);
}
LOG.info("ORC pushdown predicate: " + sarg);
String[] neededColumnNames = columnNamesString.split(",");
@@ -910,31 +911,31 @@ public class OrcInputFormat implements
static List<OrcSplit> generateSplitsInfo(Configuration conf)
throws IOException {
- // use threads to resolve directories into splits
- Context context = new Context(conf);
- for(Path dir: getInputPaths(conf)) {
- FileSystem fs = dir.getFileSystem(conf);
- context.schedule(new FileGenerator(context, fs, dir));
- }
- context.waitForTasks();
- // deal with exceptions
- if (!context.errors.isEmpty()) {
- List<IOException> errors =
- new ArrayList<IOException>(context.errors.size());
- for(Throwable th: context.errors) {
- if (th instanceof IOException) {
- errors.add((IOException) th);
- } else {
- throw new RuntimeException("serious problem", th);
- }
- }
- throw new InvalidInputException(errors);
- }
+ // use threads to resolve directories into splits
+ Context context = new Context(conf);
+ for(Path dir: getInputPaths(conf)) {
+ FileSystem fs = dir.getFileSystem(conf);
+ context.schedule(new FileGenerator(context, fs, dir));
+ }
+ context.waitForTasks();
+ // deal with exceptions
+ if (!context.errors.isEmpty()) {
+ List<IOException> errors =
+ new ArrayList<IOException>(context.errors.size());
+ for(Throwable th: context.errors) {
+ if (th instanceof IOException) {
+ errors.add((IOException) th);
+ } else {
+ throw new RuntimeException("serious problem", th);
+ }
+ }
+ throw new InvalidInputException(errors);
+ }
if (context.cacheStripeDetails) {
LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/"
+ context.numFilesCounter.get());
}
- return context.splits;
+ return context.splits;
}
@Override
@@ -998,14 +999,14 @@ public class OrcInputFormat implements
((FileSplit) inputSplit).getPath(),
OrcFile.readerOptions(conf)), conf, (FileSplit) inputSplit);
}
-
+
OrcSplit split = (OrcSplit) inputSplit;
reporter.setStatus(inputSplit.toString());
Options options = new Options(conf).reporter(reporter);
final RowReader<OrcStruct> inner = getReader(inputSplit, options);
-
-
+
+
/*Even though there are no delta files, we still need to produce row ids so that an
* UPDATE or DELETE statement would work on a table which didn't have any previous updates*/
if (split.isOriginal() && split.getDeltas().isEmpty()) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Mon Sep 8 04:38:17 2014
@@ -211,18 +211,14 @@ public class OrcOutputFormat extends Fil
}
@Override
- public void update(long currentTransaction, long originalTransaction,
- long rowId, Object row) throws IOException {
+ public void update(long currentTransaction, Object row) throws IOException {
out.println("update " + path + " currTxn: " + currentTransaction +
- " origTxn: " + originalTransaction + " row: " + rowId + " obj: " +
- stringifyObject(row, inspector));
+ " obj: " + stringifyObject(row, inspector));
}
@Override
- public void delete(long currentTransaction, long originalTransaction,
- long rowId) throws IOException {
- out.println("delete " + path + " currTxn: " + currentTransaction +
- " origTxn: " + originalTransaction + " row: " + rowId);
+ public void delete(long currentTransaction, Object row) throws IOException {
+ out.println("delete " + path + " currTxn: " + currentTransaction + " obj: " + row);
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java Mon Sep 8 04:38:17 2014
@@ -31,18 +31,18 @@ import org.apache.hadoop.hive.ql.io.Reco
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CharsetEncoder;
import java.util.ArrayList;
import java.util.List;
@@ -92,6 +92,14 @@ public class OrcRecordUpdater implements
// because that is monotonically increasing to give new unique row ids.
private long rowCountDelta = 0;
private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder();
+ private StructField recIdField = null; // field to look for the record identifier in
+ private StructField rowIdField = null; // field inside recId to look for row id in
+ private StructField originalTxnField = null; // field inside recId to look for original txn in
+ private StructObjectInspector rowInspector; // OI for the original row
+ private StructObjectInspector recIdInspector; // OI for the record identifier struct
+ private LongObjectInspector rowIdInspector; // OI for the long row id inside the recordIdentifier
+ private LongObjectInspector origTxnInspector; // OI for the original txn inside the record
+ // identifer
static class AcidStats {
long inserts;
@@ -179,7 +187,7 @@ public class OrcRecordUpdater implements
* @param rowInspector the row's object inspector
* @return an object inspector for the event stream
*/
- static ObjectInspector createEventSchema(ObjectInspector rowInspector) {
+ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) {
List<StructField> fields = new ArrayList<StructField>();
fields.add(new OrcStruct.Field("operation",
PrimitiveObjectInspectorFactory.writableIntObjectInspector, OPERATION));
@@ -237,7 +245,9 @@ public class OrcRecordUpdater implements
writerOptions.bufferSize(DELTA_BUFFER_SIZE);
writerOptions.stripeSize(DELTA_STRIPE_SIZE);
}
- writerOptions.inspector(createEventSchema(options.getInspector()));
+ rowInspector = (StructObjectInspector)options.getInspector();
+ writerOptions.inspector(createEventSchema(findRecId(options.getInspector(),
+ options.getRecordIdColumn())));
this.writer = OrcFile.createWriter(this.path, writerOptions);
item = new OrcStruct(FIELDS);
item.setFieldValue(OPERATION, operation);
@@ -247,14 +257,50 @@ public class OrcRecordUpdater implements
item.setFieldValue(ROW_ID, rowId);
}
- private void addEvent(int operation, long currentTransaction,
- long originalTransaction, long rowId,
- Object row) throws IOException {
+ // Find the record identifier column (if there) and return a possibly new ObjectInspector that
+ // will strain out the record id for the underlying writer.
+ private ObjectInspector findRecId(ObjectInspector inspector, int rowIdColNum) {
+ if (!(inspector instanceof StructObjectInspector)) {
+ throw new RuntimeException("Serious problem, expected a StructObjectInspector, but got a " +
+ inspector.getClass().getName());
+ }
+ if (rowIdColNum < 0) {
+ return inspector;
+ } else {
+ RecIdStrippingObjectInspector newInspector =
+ new RecIdStrippingObjectInspector(inspector, rowIdColNum);
+ recIdField = newInspector.getRecId();
+ List<? extends StructField> fields =
+ ((StructObjectInspector) recIdField.getFieldObjectInspector()).getAllStructFieldRefs();
+ // Go by position, not field name, as field names aren't guaranteed. The order of fields
+ // in RecordIdentifier is transactionId, bucketId, rowId
+ originalTxnField = fields.get(0);
+ origTxnInspector = (LongObjectInspector)originalTxnField.getFieldObjectInspector();
+ rowIdField = fields.get(2);
+ rowIdInspector = (LongObjectInspector)rowIdField.getFieldObjectInspector();
+
+
+ recIdInspector = (StructObjectInspector) recIdField.getFieldObjectInspector();
+ return newInspector;
+ }
+ }
+
+ private void addEvent(int operation, long currentTransaction, long rowId, Object row)
+ throws IOException {
this.operation.set(operation);
this.currentTransaction.set(currentTransaction);
- this.originalTransaction.set(originalTransaction);
+ // If this is an insert, originalTransaction should be set to this transaction. If not,
+ // it will be reset by the following if anyway.
+ long originalTransaction = currentTransaction;
+ if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
+ Object rowIdValue = rowInspector.getStructFieldData(row, recIdField);
+ originalTransaction = origTxnInspector.get(
+ recIdInspector.getStructFieldData(rowIdValue, originalTxnField));
+ rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue, rowIdField));
+ }
this.rowId.set(rowId);
- item.setFieldValue(OrcRecordUpdater.ROW, row);
+ this.originalTransaction.set(originalTransaction);
+ item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ? null : row));
indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId);
writer.addRow(item);
}
@@ -264,30 +310,26 @@ public class OrcRecordUpdater implements
if (this.currentTransaction.get() != currentTransaction) {
insertedRows = 0;
}
- addEvent(INSERT_OPERATION, currentTransaction, currentTransaction,
- insertedRows++, row);
+ addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
rowCountDelta++;
}
@Override
- public void update(long currentTransaction, long originalTransaction,
- long rowId, Object row) throws IOException {
+ public void update(long currentTransaction, Object row) throws IOException {
if (this.currentTransaction.get() != currentTransaction) {
insertedRows = 0;
}
- addEvent(UPDATE_OPERATION, currentTransaction, originalTransaction, rowId,
- row);
+ addEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
}
@Override
- public void delete(long currentTransaction, long originalTransaction,
- long rowId) throws IOException {
+ public void delete(long currentTransaction, Object row) throws IOException {
if (this.currentTransaction.get() != currentTransaction) {
insertedRows = 0;
}
- addEvent(DELETE_OPERATION, currentTransaction, originalTransaction, rowId,
- null);
+ addEvent(DELETE_OPERATION, currentTransaction, -1, row);
rowCountDelta--;
+
}
@Override
@@ -311,7 +353,7 @@ public class OrcRecordUpdater implements
fs.delete(path, false);
}
} else {
- writer.close();
+ if (writer != null) writer.close();
}
if (flushLengths != null) {
flushLengths.close();
@@ -406,4 +448,67 @@ public class OrcRecordUpdater implements
lastRowId = rowId;
}
}
+
+ /**
+ * An ObjectInspector that will strip out the record identifier so that the underlying writer
+ * doesn't see it.
+ */
+ private static class RecIdStrippingObjectInspector extends StructObjectInspector {
+ private StructObjectInspector wrapped;
+ List<StructField> fields;
+ StructField recId;
+
+ RecIdStrippingObjectInspector(ObjectInspector oi, int rowIdColNum) {
+ if (!(oi instanceof StructObjectInspector)) {
+ throw new RuntimeException("Serious problem, expected a StructObjectInspector, " +
+ "but got a " + oi.getClass().getName());
+ }
+ wrapped = (StructObjectInspector)oi;
+ List<? extends StructField> wrappedFields = wrapped.getAllStructFieldRefs();
+ fields = new ArrayList<StructField>(wrapped.getAllStructFieldRefs().size());
+ for (int i = 0; i < wrappedFields.size(); i++) {
+ if (i == rowIdColNum) {
+ recId = wrappedFields.get(i);
+ } else {
+ fields.add(wrappedFields.get(i));
+ }
+ }
+ }
+
+ @Override
+ public List<? extends StructField> getAllStructFieldRefs() {
+ return fields;
+ }
+
+ @Override
+ public StructField getStructFieldRef(String fieldName) {
+ return wrapped.getStructFieldRef(fieldName);
+ }
+
+ @Override
+ public Object getStructFieldData(Object data, StructField fieldRef) {
+ // For performance don't check that that the fieldRef isn't recId everytime,
+ // just assume that the caller used getAllStructFieldRefs and thus doesn't have that fieldRef
+ return wrapped.getStructFieldData(data, fieldRef);
+ }
+
+ @Override
+ public List<Object> getStructFieldsDataAsList(Object data) {
+ return wrapped.getStructFieldsDataAsList(data);
+ }
+
+ @Override
+ public String getTypeName() {
+ return wrapped.getTypeName();
+ }
+
+ @Override
+ public Category getCategory() {
+ return wrapped.getCategory();
+ }
+
+ StructField getRecId() {
+ return recId;
+ }
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Mon Sep 8 04:38:17 2014
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -908,10 +909,10 @@ class RecordReaderImpl implements Record
}
private static class BinaryTreeReader extends TreeReader{
- private InStream stream;
- private IntegerReader lengths = null;
+ protected InStream stream;
+ protected IntegerReader lengths = null;
- private final LongColumnVector scratchlcv;
+ protected final LongColumnVector scratchlcv;
BinaryTreeReader(Path path, int columnId, Configuration conf) {
super(path, columnId, conf);
@@ -983,7 +984,7 @@ class RecordReaderImpl implements Record
// Read present/isNull stream
super.nextVector(result, batchSize);
- BytesColumnVectorUtil.setRefToOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
+ BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
return result;
}
@@ -1376,12 +1377,13 @@ class RecordReaderImpl implements Record
}
}
+ // This class collects together very similar methods for reading an ORC vector of byte arrays and
+ // creating the BytesColumnVector.
+ //
private static class BytesColumnVectorUtil {
- // This method has the common code for reading in bytes into a BytesColumnVector.
- // It is used by the BINARY, STRING, CHAR, VARCHAR types.
- public static void setRefToOrcByteArrays(InStream stream, IntegerReader lengths, LongColumnVector scratchlcv,
- BytesColumnVector result, long batchSize) throws IOException {
+ private static byte[] commonReadByteArrays(InStream stream, IntegerReader lengths, LongColumnVector scratchlcv,
+ BytesColumnVector result, long batchSize) throws IOException {
// Read lengths
scratchlcv.isNull = result.isNull; // Notice we are replacing the isNull vector here...
lengths.nextVector(scratchlcv, batchSize);
@@ -1409,11 +1411,20 @@ class RecordReaderImpl implements Record
}
len -= bytesRead;
offset += bytesRead;
- }
+ }
+
+ return allBytes;
+ }
+
+ // This method has the common code for reading in bytes into a BytesColumnVector.
+ public static void readOrcByteArrays(InStream stream, IntegerReader lengths, LongColumnVector scratchlcv,
+ BytesColumnVector result, long batchSize) throws IOException {
+
+ byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, result, batchSize);
// Too expensive to figure out 'repeating' by comparisons.
result.isRepeating = false;
- offset = 0;
+ int offset = 0;
if (!scratchlcv.isRepeating) {
for (int i = 0; i < batchSize; i++) {
if (!scratchlcv.isNull[i]) {
@@ -1518,7 +1529,7 @@ class RecordReaderImpl implements Record
// Read present/isNull stream
super.nextVector(result, batchSize);
- BytesColumnVectorUtil.setRefToOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
+ BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
return result;
}
@@ -1734,6 +1745,42 @@ class RecordReaderImpl implements Record
result.enforceMaxLength(maxLength);
return result;
}
+
+ @Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ // Get the vector of strings from StringTreeReader, then make a 2nd pass to
+ // adjust down the length (right trim and truncate) if necessary.
+ BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
+
+ int adjustedDownLen;
+ if (result.isRepeating) {
+ if (result.noNulls || !result.isNull[0]) {
+ adjustedDownLen = StringExpr.rightTrimAndTruncate(result.vector[0], result.start[0], result.length[0], maxLength);
+ if (adjustedDownLen < result.length[0]) {
+ result.setRef(0, result.vector[0], result.start[0], adjustedDownLen);
+ }
+ }
+ } else {
+ if (result.noNulls){
+ for (int i = 0; i < batchSize; i++) {
+ adjustedDownLen = StringExpr.rightTrimAndTruncate(result.vector[i], result.start[i], result.length[i], maxLength);
+ if (adjustedDownLen < result.length[i]) {
+ result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
+ }
+ }
+ } else {
+ for (int i = 0; i < batchSize; i++) {
+ if (!result.isNull[i]) {
+ adjustedDownLen = StringExpr.rightTrimAndTruncate(result.vector[i], result.start[i], result.length[i], maxLength);
+ if (adjustedDownLen < result.length[i]) {
+ result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
}
private static class VarcharTreeReader extends StringTreeReader {
@@ -1762,6 +1809,42 @@ class RecordReaderImpl implements Record
result.enforceMaxLength(maxLength);
return result;
}
+
+ @Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ // Get the vector of strings from StringTreeReader, then make a 2nd pass to
+ // adjust down the length (truncate) if necessary.
+ BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
+
+ int adjustedDownLen;
+ if (result.isRepeating) {
+ if (result.noNulls || !result.isNull[0]) {
+ adjustedDownLen = StringExpr.truncate(result.vector[0], result.start[0], result.length[0], maxLength);
+ if (adjustedDownLen < result.length[0]) {
+ result.setRef(0, result.vector[0], result.start[0], adjustedDownLen);
+ }
+ }
+ } else {
+ if (result.noNulls){
+ for (int i = 0; i < batchSize; i++) {
+ adjustedDownLen = StringExpr.truncate(result.vector[i], result.start[i], result.length[i], maxLength);
+ if (adjustedDownLen < result.length[i]) {
+ result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
+ }
+ }
+ } else {
+ for (int i = 0; i < batchSize; i++) {
+ if (!result.isNull[i]) {
+ adjustedDownLen = StringExpr.truncate(result.vector[i], result.start[i], result.length[i], maxLength);
+ if (adjustedDownLen < result.length[i]) {
+ result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
+ }
+ }
+ }
+ }
+ }
+ return result;
+ }
}
private static class StructTreeReader extends TreeReader {