You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/04/10 08:02:32 UTC
[09/16] hive git commit: HIVE-9862 Vectorized execution corrupts
timestamp values (Matt McCline, reviewed by Jason Dere)
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt
----------------------------------------------------------------------
diff --git a/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt
new file mode 100644
index 0000000..3cdf405
--- /dev/null
+++ b/ql/src/gen/vectorization/UDAFTemplates/VectorUDAFMinMaxTimestamp.txt
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen;
+
+import org.apache.hadoop.hive.common.type.PisaTimestamp;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+/**
+* <ClassName>. Vectorized implementation for MIN/MAX aggregates.
+*/
+@Description(name = "<DescriptionName>",
+ value = "<DescriptionValue>")
+public class <ClassName> extends VectorAggregateExpression {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * class for storing the current aggregate value.
+ */
+ static private final class Aggregation implements AggregationBuffer {
+
+ private static final long serialVersionUID = 1L;
+
+ transient private final PisaTimestamp value;
+
+ /**
+ * Value is explicitly (re)initialized in reset()
+ */
+ transient private boolean isNull = true;
+
+ public Aggregation() {
+ value = new PisaTimestamp();
+ }
+
+ public void checkValue(TimestampColumnVector colVector, int index) {
+ if (isNull) {
+ isNull = false;
+ colVector.pisaTimestampUpdate(this.value, index);
+ } else if (colVector.compareTo(this.value, index) <OperatorSymbol> 0) {
+ colVector.pisaTimestampUpdate(this.value, index);
+ }
+ }
+
+ @Override
+ public int getVariableSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset () {
+ isNull = true;
+ this.value.reset();
+ }
+ }
+
+ private VectorExpression inputExpression;
+ private transient VectorExpressionWriter resultWriter;
+
+ public <ClassName>(VectorExpression inputExpression) {
+ this();
+ this.inputExpression = inputExpression;
+ }
+
+ public <ClassName>() {
+ super();
+ }
+
+ @Override
+ public void init(AggregationDesc desc) throws HiveException {
+ resultWriter = VectorExpressionWriterFactory.genVectorExpressionWritable(
+ desc.getParameters().get(0));
+ }
+
+ private Aggregation getCurrentAggregationBuffer(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ int row) {
+ VectorAggregationBufferRow mySet = aggregationBufferSets[row];
+ Aggregation myagg = (Aggregation) mySet.getAggregationBuffer(aggregrateIndex);
+ return myagg;
+ }
+
+ @Override
+ public void aggregateInputSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ VectorizedRowBatch batch) throws HiveException {
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ inputExpression.evaluate(batch);
+
+ TimestampColumnVector inputColVector = (TimestampColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ if (inputColVector.noNulls) {
+ if (inputColVector.isRepeating) {
+ iterateNoNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColVector, batchSize);
+ } else {
+ if (batch.selectedInUse) {
+ iterateNoNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColVector, batch.selected, batchSize);
+ } else {
+ iterateNoNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColVector, batchSize);
+ }
+ }
+ } else {
+ if (inputColVector.isRepeating) {
+ if (batch.selectedInUse) {
+ iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColVector, batchSize, batch.selected, inputColVector.isNull);
+ } else {
+ iterateHasNullsRepeatingWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColVector, batchSize, inputColVector.isNull);
+ }
+ } else {
+ if (batch.selectedInUse) {
+ iterateHasNullsSelectionWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColVector, batchSize, batch.selected, inputColVector.isNull);
+ } else {
+ iterateHasNullsWithAggregationSelection(
+ aggregationBufferSets, aggregrateIndex,
+ inputColVector, batchSize, inputColVector.isNull);
+ }
+ }
+ }
+ }
+
+ private void iterateNoNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ TimestampColumnVector inputColVector,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ // Repeating use index 0.
+ myagg.checkValue(inputColVector, 0);
+ }
+ }
+
+ private void iterateNoNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ TimestampColumnVector inputColVector,
+ int[] selection,
+ int batchSize) {
+
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColVector, selection[i]);
+ }
+ }
+
+ private void iterateNoNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ TimestampColumnVector inputColVector,
+ int batchSize) {
+ for (int i=0; i < batchSize; ++i) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColVector, i);
+ }
+ }
+
+ private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ TimestampColumnVector inputColVector,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[selection[i]]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ // Repeating use index 0.
+ myagg.checkValue(inputColVector, 0);
+ }
+ }
+
+ }
+
+ private void iterateHasNullsRepeatingWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ TimestampColumnVector inputColVector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ // Repeating use index 0.
+ myagg.checkValue(inputColVector, 0);
+ }
+ }
+ }
+
+ private void iterateHasNullsSelectionWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ TimestampColumnVector inputColVector,
+ int batchSize,
+ int[] selection,
+ boolean[] isNull) {
+
+ for (int j=0; j < batchSize; ++j) {
+ int i = selection[j];
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ j);
+ myagg.checkValue(inputColVector, i);
+ }
+ }
+ }
+
+ private void iterateHasNullsWithAggregationSelection(
+ VectorAggregationBufferRow[] aggregationBufferSets,
+ int aggregrateIndex,
+ TimestampColumnVector inputColVector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for (int i=0; i < batchSize; ++i) {
+ if (!isNull[i]) {
+ Aggregation myagg = getCurrentAggregationBuffer(
+ aggregationBufferSets,
+ aggregrateIndex,
+ i);
+ myagg.checkValue(inputColVector, i);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateInput(AggregationBuffer agg, VectorizedRowBatch batch)
+ throws HiveException {
+
+ inputExpression.evaluate(batch);
+
+ TimestampColumnVector inputColVector = (TimestampColumnVector)batch.
+ cols[this.inputExpression.getOutputColumn()];
+
+ int batchSize = batch.size;
+
+ if (batchSize == 0) {
+ return;
+ }
+
+ Aggregation myagg = (Aggregation)agg;
+
+ if (inputColVector.isRepeating) {
+ if (inputColVector.noNulls &&
+ (myagg.isNull || (inputColVector.compareTo(myagg.value, 0) <OperatorSymbol> 0))) {
+ myagg.isNull = false;
+ inputColVector.pisaTimestampUpdate(myagg.value, 0);
+ }
+ return;
+ }
+
+ if (!batch.selectedInUse && inputColVector.noNulls) {
+ iterateNoSelectionNoNulls(myagg, inputColVector, batchSize);
+ }
+ else if (!batch.selectedInUse) {
+ iterateNoSelectionHasNulls(myagg, inputColVector,
+ batchSize, inputColVector.isNull);
+ }
+ else if (inputColVector.noNulls){
+ iterateSelectionNoNulls(myagg, inputColVector, batchSize, batch.selected);
+ }
+ else {
+ iterateSelectionHasNulls(myagg, inputColVector,
+ batchSize, inputColVector.isNull, batch.selected);
+ }
+ }
+
+ private void iterateSelectionHasNulls(
+ Aggregation myagg,
+ TimestampColumnVector inputColVector,
+ int batchSize,
+ boolean[] isNull,
+ int[] selected) {
+
+ for (int j=0; j< batchSize; ++j) {
+ int i = selected[j];
+ if (!isNull[i]) {
+ if (myagg.isNull) {
+ myagg.isNull = false;
+ inputColVector.pisaTimestampUpdate(myagg.value, i);
+ }
+ else if (inputColVector.compareTo(myagg.value, i) <OperatorSymbol> 0) {
+ inputColVector.pisaTimestampUpdate(myagg.value, i);
+ }
+ }
+ }
+ }
+
+ private void iterateSelectionNoNulls(
+ Aggregation myagg,
+ TimestampColumnVector inputColVector,
+ int batchSize,
+ int[] selected) {
+
+ if (myagg.isNull) {
+ inputColVector.pisaTimestampUpdate(myagg.value, selected[0]);
+ myagg.isNull = false;
+ }
+
+ for (int i=0; i< batchSize; ++i) {
+ int sel = selected[i];
+ if (inputColVector.compareTo(myagg.value, sel) <OperatorSymbol> 0) {
+ inputColVector.pisaTimestampUpdate(myagg.value, sel);
+ }
+ }
+ }
+
+ private void iterateNoSelectionHasNulls(
+ Aggregation myagg,
+ TimestampColumnVector inputColVector,
+ int batchSize,
+ boolean[] isNull) {
+
+ for(int i=0;i<batchSize;++i) {
+ if (!isNull[i]) {
+ if (myagg.isNull) {
+ inputColVector.pisaTimestampUpdate(myagg.value, i);
+ myagg.isNull = false;
+ }
+ else if (inputColVector.compareTo(myagg.value, i) <OperatorSymbol> 0) {
+ inputColVector.pisaTimestampUpdate(myagg.value, i);
+ }
+ }
+ }
+ }
+
+ private void iterateNoSelectionNoNulls(
+ Aggregation myagg,
+ TimestampColumnVector inputColVector,
+ int batchSize) {
+ if (myagg.isNull) {
+ inputColVector.pisaTimestampUpdate(myagg.value, 0);
+ myagg.isNull = false;
+ }
+
+ for (int i=0;i<batchSize;++i) {
+ if (inputColVector.compareTo(myagg.value, i) <OperatorSymbol> 0) {
+ inputColVector.pisaTimestampUpdate(myagg.value, i);
+ }
+ }
+ }
+
+ @Override
+ public AggregationBuffer getNewAggregationBuffer() throws HiveException {
+ return new Aggregation();
+ }
+
+ @Override
+ public void reset(AggregationBuffer agg) throws HiveException {
+ Aggregation myAgg = (Aggregation) agg;
+ myAgg.reset();
+ }
+
+ @Override
+ public Object evaluateOutput(
+ AggregationBuffer agg) throws HiveException {
+ Aggregation myagg = (Aggregation) agg;
+ if (myagg.isNull) {
+ return null;
+ }
+ else {
+ return resultWriter.writeValue(myagg.value);
+ }
+ }
+
+ @Override
+ public ObjectInspector getOutputObjectInspector() {
+ return resultWriter.getObjectInspector();
+ }
+
+ @Override
+ public int getAggregationBufferFixedSize() {
+ JavaDataModel model = JavaDataModel.get();
+ return JavaDataModel.alignUp(
+ model.object() +
+ model.primitive2(),
+ model.memoryAlign());
+ }
+
+ public VectorExpression getInputExpression() {
+ return inputExpression;
+ }
+
+ public void setInputExpression(VectorExpression inputExpression) {
+ this.inputExpression = inputExpression;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampUtils.java
index 95dbf8d..5de055c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampUtils.java
@@ -18,50 +18,24 @@
package org.apache.hadoop.hive.ql.exec.vector;
-import java.sql.Timestamp;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
public final class TimestampUtils {
- /**
- * Store the given timestamp in nanoseconds into the timestamp object.
- * @param timeInNanoSec Given timestamp in nanoseconds
- * @param t The timestamp object
- */
- public static void assignTimeInNanoSec(long timeInNanoSec, Timestamp t) {
- /*
- * java.sql.Timestamp consists of a long variable to store milliseconds and an integer variable for nanoseconds.
- * The long variable is used to store only the full seconds converted to millis. For example for 1234 milliseconds,
- * 1000 is stored in the long variable, and 234000000 (234 converted to nanoseconds) is stored as nanoseconds.
- * The negative timestamps are also supported, but nanoseconds must be positive therefore millisecond part is
- * reduced by one second.
- */
- long integralSecInMillis = (timeInNanoSec / 1000000000) * 1000; // Full seconds converted to millis.
- long nanos = timeInNanoSec % 1000000000; // The nanoseconds.
- if (nanos < 0) {
- nanos = 1000000000 + nanos; // The positive nano-part that will be added to milliseconds.
- integralSecInMillis = ((timeInNanoSec / 1000000000) - 1) * 1000; // Reduce by one second.
- }
- t.setTime(integralSecInMillis);
- t.setNanos((int) nanos);
- }
-
- public static long getTimeNanoSec(Timestamp t) {
- long time = t.getTime();
- int nanos = t.getNanos();
- return (time * 1000000) + (nanos % 1000000);
- }
+ static final long MILLISECONDS_PER_SECOND = TimeUnit.SECONDS.toMillis(1);
+ static final long NANOSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);
- public static long secondsToNanoseconds(long seconds) {
- return seconds * 1000000000;
- }
-
- public static long doubleToNanoseconds(double d) {
- return (long) (d * 1000000000);
+ public static long daysToNanoseconds(long daysSinceEpoch) {
+ return DateWritable.daysToMillis((int) daysSinceEpoch) * NANOSECONDS_PER_MILLISECOND;
}
- public static long daysToNanoseconds(long daysSinceEpoch) {
- return DateWritable.daysToMillis((int) daysSinceEpoch) * 1000000;
+ public static TimestampWritable timestampColumnVectorWritable(
+ TimestampColumnVector timestampColVector, int elementNum,
+ TimestampWritable timestampWritable) {
+ timestampWritable.set(timestampColVector.asScratchTimestamp(elementNum));
+ return timestampWritable;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
index d69454f..965c027 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec.vector;
-import java.sql.Timestamp;
import java.util.List;
import org.slf4j.Logger;
@@ -234,7 +233,26 @@ public abstract class VectorAssignRow {
}
}
- private class TimestampAssigner extends AbstractLongAssigner {
+ private abstract class AbstractTimestampAssigner extends Assigner {
+
+ protected TimestampColumnVector colVector;
+
+ AbstractTimestampAssigner(int columnIndex) {
+ super(columnIndex);
+ }
+
+ @Override
+ void setColumnVector(VectorizedRowBatch batch) {
+ colVector = (TimestampColumnVector) batch.cols[columnIndex];
+ }
+
+ @Override
+ void forgetColumnVector() {
+ colVector = null;
+ }
+ }
+
+ private class TimestampAssigner extends AbstractTimestampAssigner {
TimestampAssigner(int columnIndex) {
super(columnIndex);
@@ -245,9 +263,7 @@ public abstract class VectorAssignRow {
if (object == null) {
VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
} else {
- TimestampWritable tw = (TimestampWritable) object;
- Timestamp t = tw.getTimestamp();
- vector[batchIndex] = TimestampUtils.getTimeNanoSec(t);
+ colVector.set(batchIndex, ((TimestampWritable) object).getTimestamp());
colVector.isNull[batchIndex] = false;
}
}
@@ -272,7 +288,7 @@ public abstract class VectorAssignRow {
}
}
- private class IntervalDayTimeAssigner extends AbstractLongAssigner {
+ private class IntervalDayTimeAssigner extends AbstractTimestampAssigner {
IntervalDayTimeAssigner(int columnIndex) {
super(columnIndex);
@@ -285,7 +301,7 @@ public abstract class VectorAssignRow {
} else {
HiveIntervalDayTimeWritable idtw = (HiveIntervalDayTimeWritable) object;
HiveIntervalDayTime idt = idtw.getHiveIntervalDayTime();
- vector[batchIndex] = DateUtils.getIntervalDayTimeTotalNanos(idt);
+ colVector.set(batchIndex, idt.pisaTimestampUpdate(colVector.useScratchPisaTimestamp()));
colVector.isNull[batchIndex] = false;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java
index befe2fc..463c8a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java
@@ -165,6 +165,17 @@ public class VectorColumnAssignFactory {
}
}
+ private static abstract class VectorTimestampColumnAssign
+ extends VectorColumnAssignVectorBase<TimestampColumnVector> {
+
+ protected void assignTimestamp(Timestamp value, int index) {
+ outCol.set(index, value);
+ }
+ protected void assignTimestamp(TimestampWritable tw, int index) {
+ outCol.set(index, tw.getTimestamp());
+ }
+ }
+
public static VectorColumnAssign[] buildAssigners(VectorizedRowBatch outputBatch)
throws HiveException {
@@ -313,19 +324,17 @@ public class VectorColumnAssignFactory {
}.init(outputBatch, (LongColumnVector) destCol);
break;
case TIMESTAMP:
- outVCA = new VectorLongColumnAssign() {
+ outVCA = new VectorTimestampColumnAssign() {
@Override
public void assignObjectValue(Object val, int destIndex) throws HiveException {
if (val == null) {
assignNull(destIndex);
}
else {
- TimestampWritable bw = (TimestampWritable) val;
- Timestamp t = bw.getTimestamp();
- assignLong(TimestampUtils.getTimeNanoSec(t), destIndex);
+ assignTimestamp((TimestampWritable) val, destIndex);
}
}
- }.init(outputBatch, (LongColumnVector) destCol);
+ }.init(outputBatch, (TimestampColumnVector) destCol);
break;
case DATE:
outVCA = new VectorLongColumnAssign() {
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
index 6673509..0949145 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnSetInfo.java
@@ -22,8 +22,6 @@ import java.util.Arrays;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -57,6 +55,11 @@ public class VectorColumnSetInfo {
protected int[] decimalIndices;
/**
+ * indices of TIMESTAMP primitive keys.
+ */
+ protected int[] timestampIndices;
+
+ /**
* Helper class for looking up a key value based on key index.
*/
public class KeyLookupHelper {
@@ -64,11 +67,13 @@ public class VectorColumnSetInfo {
public int doubleIndex;
public int stringIndex;
public int decimalIndex;
+ public int timestampIndex;
private static final int INDEX_UNUSED = -1;
private void resetIndices() {
- this.longIndex = this.doubleIndex = this.stringIndex = this.decimalIndex = INDEX_UNUSED;
+ this.longIndex = this.doubleIndex = this.stringIndex = this.decimalIndex =
+ timestampIndex = INDEX_UNUSED;
}
public void setLong(int index) {
resetIndices();
@@ -89,6 +94,11 @@ public class VectorColumnSetInfo {
resetIndices();
this.decimalIndex = index;
}
+
+ public void setTimestamp(int index) {
+ resetIndices();
+ this.timestampIndex= index;
+ }
}
/**
@@ -103,6 +113,7 @@ public class VectorColumnSetInfo {
protected int doubleIndicesIndex;
protected int stringIndicesIndex;
protected int decimalIndicesIndex;
+ protected int timestampIndicesIndex;
protected VectorColumnSetInfo(int keyCount) {
this.keyCount = keyCount;
@@ -117,6 +128,8 @@ public class VectorColumnSetInfo {
stringIndicesIndex = 0;
decimalIndices = new int[this.keyCount];
decimalIndicesIndex = 0;
+ timestampIndices = new int[this.keyCount];
+ timestampIndicesIndex = 0;
indexLookup = new KeyLookupHelper[this.keyCount];
}
@@ -153,6 +166,12 @@ public class VectorColumnSetInfo {
++decimalIndicesIndex;
break;
+ case TIMESTAMP:
+ timestampIndices[timestampIndicesIndex] = addIndex;
+ indexLookup[addIndex].setTimestamp(timestampIndicesIndex);
+ ++timestampIndicesIndex;
+ break;
+
default:
throw new HiveException("Unexpected column vector type " + columnVectorType);
}
@@ -165,5 +184,6 @@ public class VectorColumnSetInfo {
doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex);
stringIndices = Arrays.copyOf(stringIndices, stringIndicesIndex);
decimalIndices = Arrays.copyOf(decimalIndices, decimalIndicesIndex);
+ timestampIndices = Arrays.copyOf(timestampIndices, timestampIndicesIndex);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorCopyRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorCopyRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorCopyRow.java
index 6b681b3..73476a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorCopyRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorCopyRow.java
@@ -22,8 +22,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -199,6 +197,32 @@ public class VectorCopyRow {
}
}
+ private class TimestampCopyRow extends CopyRow {
+
+ TimestampCopyRow(int inColumnIndex, int outColumnIndex) {
+ super(inColumnIndex, outColumnIndex);
+ }
+
+ @Override
+ void copy(VectorizedRowBatch inBatch, int inBatchIndex, VectorizedRowBatch outBatch, int outBatchIndex) {
+ TimestampColumnVector inColVector = (TimestampColumnVector) inBatch.cols[inColumnIndex];
+ TimestampColumnVector outColVector = (TimestampColumnVector) outBatch.cols[outColumnIndex];
+
+ if (inColVector.isRepeating) {
+ if (inColVector.noNulls || !inColVector.isNull[0]) {
+ outColVector.setElement(outBatchIndex, 0, inColVector);
+ } else {
+ VectorizedBatchUtil.setNullColIsNullValue(outColVector, outBatchIndex);
+ }
+ } else {
+ if (inColVector.noNulls || !inColVector.isNull[inBatchIndex]) {
+ outColVector.setElement(outBatchIndex, inBatchIndex, inColVector);
+ } else {
+ VectorizedBatchUtil.setNullColIsNullValue(outColVector, outBatchIndex);
+ }
+ }
+ }
+ }
private CopyRow[] subRowToBatchCopiersByValue;
private CopyRow[] subRowToBatchCopiersByReference;
@@ -222,6 +246,10 @@ public class VectorCopyRow {
copyRowByValue = new LongCopyRow(inputColumn, outputColumn);
break;
+ case TIMESTAMP:
+ copyRowByValue = new TimestampCopyRow(inputColumn, outputColumn);
+ break;
+
case DOUBLE:
copyRowByValue = new DoubleCopyRow(inputColumn, outputColumn);
break;
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
index 9b086b8..50881e7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.vector;
import java.io.EOFException;
import java.io.IOException;
-import java.sql.Timestamp;
import java.util.List;
import org.slf4j.Logger;
@@ -210,7 +209,14 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
}
}
- private class TimestampReader extends AbstractLongReader {
+ private abstract class AbstractTimestampReader extends Reader<T> {
+
+ AbstractTimestampReader(int columnIndex) {
+ super(columnIndex);
+ }
+ }
+
+ private class TimestampReader extends AbstractTimestampReader {
DeserializeRead.ReadTimestampResults readTimestampResults;
@@ -221,17 +227,17 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
@Override
void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
- LongColumnVector colVector = (LongColumnVector) batch.cols[columnIndex];
+ TimestampColumnVector colVector = (TimestampColumnVector) batch.cols[columnIndex];
if (deserializeRead.readCheckNull()) {
VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
} else {
deserializeRead.readTimestamp(readTimestampResults);
- Timestamp t = readTimestampResults.getTimestamp();
- colVector.vector[batchIndex] = TimestampUtils.getTimeNanoSec(t);
+ colVector.set(batchIndex, readTimestampResults.getTimestamp());
colVector.isNull[batchIndex] = false;
}
}
+
}
private class IntervalYearMonthReader extends AbstractLongReader {
@@ -258,7 +264,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
}
}
- private class IntervalDayTimeReader extends AbstractLongReader {
+ private class IntervalDayTimeReader extends AbstractTimestampReader {
DeserializeRead.ReadIntervalDayTimeResults readIntervalDayTimeResults;
@@ -269,14 +275,14 @@ public final class VectorDeserializeRow<T extends DeserializeRead> {
@Override
void apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
- LongColumnVector colVector = (LongColumnVector) batch.cols[columnIndex];
+ TimestampColumnVector colVector = (TimestampColumnVector) batch.cols[columnIndex];
if (deserializeRead.readCheckNull()) {
VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex);
} else {
deserializeRead.readIntervalDayTime(readIntervalDayTimeResults);
- HiveIntervalDayTime hidt = readIntervalDayTimeResults.getHiveIntervalDayTime();
- colVector.vector[batchIndex] = DateUtils.getIntervalDayTimeTotalNanos(hidt);
+ HiveIntervalDayTime idt = readIntervalDayTimeResults.getHiveIntervalDayTime();
+ colVector.set(batchIndex, idt.pisaTimestampUpdate(colVector.useScratchPisaTimestamp()));
colVector.isNull[batchIndex] = false;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
index e221362..0b9ad55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java
@@ -43,7 +43,7 @@ public class VectorExpressionDescriptor {
// LongColumnVector -->
// INT_FAMILY
// DATE
- // TIMESTAMP
+ // INTERVAL_FAMILY
//
// DoubleColumnVector -->
// FLOAT_FAMILY
@@ -56,6 +56,9 @@ public class VectorExpressionDescriptor {
// CHAR
// VARCHAR
//
+ // TimestampColumnVector -->
+ // TIMESTAMP
+ //
public enum ArgumentType {
NONE (0x000),
INT_FAMILY (0x001),
@@ -71,9 +74,9 @@ public class VectorExpressionDescriptor {
INTERVAL_DAY_TIME (0x200),
DATETIME_FAMILY (DATE.value | TIMESTAMP.value),
INTERVAL_FAMILY (INTERVAL_YEAR_MONTH.value | INTERVAL_DAY_TIME.value),
- INT_TIMESTAMP_FAMILY (INT_FAMILY.value | TIMESTAMP.value),
- INT_INTERVAL_FAMILY (INT_FAMILY.value | INTERVAL_FAMILY.value),
- INT_DATETIME_INTERVAL_FAMILY (INT_FAMILY.value | DATETIME_FAMILY.value | INTERVAL_FAMILY.value),
+ INT_INTERVAL_YEAR_MONTH (INT_FAMILY.value | INTERVAL_YEAR_MONTH.value),
+ INT_DATE_INTERVAL_YEAR_MONTH (INT_FAMILY.value | DATE.value | INTERVAL_YEAR_MONTH.value),
+ TIMESTAMP_INTERVAL_DAY_TIME (TIMESTAMP.value | INTERVAL_DAY_TIME.value),
STRING_DATETIME_FAMILY (STRING_FAMILY.value | DATETIME_FAMILY.value),
ALL_FAMILY (0xFFF);
@@ -146,10 +149,12 @@ public class VectorExpressionDescriptor {
public static String getVectorColumnSimpleName(ArgumentType argType) {
if (argType == INT_FAMILY ||
argType == DATE ||
- argType == TIMESTAMP ||
- argType == INTERVAL_YEAR_MONTH ||
- argType == INTERVAL_DAY_TIME) {
+ argType == INTERVAL_YEAR_MONTH
+ ) {
return "Long";
+ } else if (argType == TIMESTAMP ||
+ argType == INTERVAL_DAY_TIME) {
+ return "Timestamp";
} else if (argType == FLOAT_FAMILY) {
return "Double";
} else if (argType == DECIMAL) {
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
index 4100bc5..622f4a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
+
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.common.type.PisaTimestamp;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -255,10 +257,29 @@ public abstract class VectorExtractRow {
}
}
- private class TimestampExtractor extends AbstractLongExtractor {
+ private abstract class AbstractTimestampExtractor extends Extractor {
+
+ protected TimestampColumnVector colVector;
+
+ AbstractTimestampExtractor(int columnIndex) {
+ super(columnIndex);
+ }
+
+ @Override
+ void setColumnVector(VectorizedRowBatch batch) {
+ colVector = (TimestampColumnVector) batch.cols[columnIndex];
+ }
+
+ @Override
+ void forgetColumnVector() {
+ colVector = null;
+ }
+ }
+
+ private class TimestampExtractor extends AbstractTimestampExtractor {
+
+ protected Timestamp timestamp;
- private Timestamp timestamp;
-
TimestampExtractor(int columnIndex) {
super(columnIndex);
object = PrimitiveObjectInspectorFactory.writableTimestampObjectInspector.create(new Timestamp(0));
@@ -269,8 +290,7 @@ public abstract class VectorExtractRow {
Object extract(int batchIndex) {
int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
- long value = vector[adjustedIndex];
- TimestampUtils.assignTimeInNanoSec(value, timestamp);
+ colVector.timestampUpdate(timestamp, adjustedIndex);
PrimitiveObjectInspectorFactory.writableTimestampObjectInspector.set(object, timestamp);
return object;
} else {
@@ -282,7 +302,7 @@ public abstract class VectorExtractRow {
private class IntervalYearMonthExtractor extends AbstractLongExtractor {
private HiveIntervalYearMonth hiveIntervalYearMonth;
-
+
IntervalYearMonthExtractor(int columnIndex) {
super(columnIndex);
object = PrimitiveObjectInspectorFactory.writableHiveIntervalYearMonthObjectInspector.create(new HiveIntervalYearMonth(0));
@@ -303,10 +323,10 @@ public abstract class VectorExtractRow {
}
}
- private class IntervalDayTimeExtractor extends AbstractLongExtractor {
+ private class IntervalDayTimeExtractor extends AbstractTimestampExtractor {
private HiveIntervalDayTime hiveIntervalDayTime;
-
+
IntervalDayTimeExtractor(int columnIndex) {
super(columnIndex);
object = PrimitiveObjectInspectorFactory.writableHiveIntervalDayTimeObjectInspector.create(new HiveIntervalDayTime(0, 0));
@@ -317,8 +337,7 @@ public abstract class VectorExtractRow {
Object extract(int batchIndex) {
int adjustedIndex = (colVector.isRepeating ? 0 : batchIndex);
if (colVector.noNulls || !colVector.isNull[adjustedIndex]) {
- long value = vector[adjustedIndex];
- DateUtils.setIntervalDayTimeTotalNanos(hiveIntervalDayTime, value);
+ hiveIntervalDayTime.set(colVector.asScratchPisaTimestamp(adjustedIndex));
PrimitiveObjectInspectorFactory.writableHiveIntervalDayTimeObjectInspector.set(object, hiveIntervalDayTime);
return object;
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
index fabac38..9f0ac11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupKeyHelper.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hive.ql.exec.vector;
import java.io.IOException;
-import java.util.Arrays;
-
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -121,5 +119,17 @@ public class VectorGroupKeyHelper extends VectorColumnSetInfo {
outputColumnVector.isNull[outputBatch.size] = true;
}
}
+ for(int i=0;i<timestampIndices.length; ++i) {
+ int keyIndex = timestampIndices[i];
+ TimestampColumnVector inputColumnVector = (TimestampColumnVector) inputBatch.cols[keyIndex];
+ TimestampColumnVector outputColumnVector = (TimestampColumnVector) outputBatch.cols[keyIndex];
+ if (inputColumnVector.noNulls || !inputColumnVector.isNull[0]) {
+
+ outputColumnVector.setElement(outputBatch.size, 0, inputColumnVector);
+ } else {
+ outputColumnVector.noNulls = false;
+ outputColumnVector.isNull[outputBatch.size] = true;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
index aff3551..b5d8164 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
@@ -21,7 +21,9 @@ package org.apache.hadoop.hive.ql.exec.vector;
import java.util.Arrays;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.PisaTimestamp;
import org.apache.hadoop.hive.ql.exec.KeyWrapper;
import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -42,6 +44,7 @@ public class VectorHashKeyWrapper extends KeyWrapper {
private static final double[] EMPTY_DOUBLE_ARRAY = new double[0];
private static final byte[][] EMPTY_BYTES_ARRAY = new byte[0][];
private static final HiveDecimalWritable[] EMPTY_DECIMAL_ARRAY = new HiveDecimalWritable[0];
+ private static final PisaTimestamp[] EMPTY_TIMESTAMP_ARRAY = new PisaTimestamp[0];
private long[] longValues;
private double[] doubleValues;
@@ -52,14 +55,17 @@ public class VectorHashKeyWrapper extends KeyWrapper {
private HiveDecimalWritable[] decimalValues;
+ private PisaTimestamp[] timestampValues;
+
private boolean[] isNull;
private int hashcode;
public VectorHashKeyWrapper(int longValuesCount, int doubleValuesCount,
- int byteValuesCount, int decimalValuesCount) {
+ int byteValuesCount, int decimalValuesCount, int timestampValuesCount) {
longValues = longValuesCount > 0 ? new long[longValuesCount] : EMPTY_LONG_ARRAY;
doubleValues = doubleValuesCount > 0 ? new double[doubleValuesCount] : EMPTY_DOUBLE_ARRAY;
decimalValues = decimalValuesCount > 0 ? new HiveDecimalWritable[decimalValuesCount] : EMPTY_DECIMAL_ARRAY;
+ timestampValues = timestampValuesCount > 0 ? new PisaTimestamp[timestampValuesCount] : EMPTY_TIMESTAMP_ARRAY;
for(int i = 0; i < decimalValuesCount; ++i) {
decimalValues[i] = new HiveDecimalWritable(HiveDecimal.ZERO);
}
@@ -72,7 +78,11 @@ public class VectorHashKeyWrapper extends KeyWrapper {
byteStarts = EMPTY_INT_ARRAY;
byteLengths = EMPTY_INT_ARRAY;
}
- isNull = new boolean[longValuesCount + doubleValuesCount + byteValuesCount + decimalValuesCount];
+ for(int i = 0; i < timestampValuesCount; ++i) {
+ timestampValues[i] = new PisaTimestamp();
+ }
+ isNull = new boolean[longValuesCount + doubleValuesCount + byteValuesCount +
+ decimalValuesCount + timestampValuesCount];
hashcode = 0;
}
@@ -94,6 +104,10 @@ public class VectorHashKeyWrapper extends KeyWrapper {
hashcode ^= decimalValues[i].getHiveDecimal().hashCode();
}
+ for (int i = 0; i < timestampValues.length; i++) {
+ hashcode ^= timestampValues[i].hashCode();
+ }
+
// This code, with branches and all, is not executed if there are no string keys
for (int i = 0; i < byteValues.length; ++i) {
/*
@@ -131,6 +145,7 @@ public class VectorHashKeyWrapper extends KeyWrapper {
Arrays.equals(longValues, keyThat.longValues) &&
Arrays.equals(doubleValues, keyThat.doubleValues) &&
Arrays.equals(decimalValues, keyThat.decimalValues) &&
+ Arrays.equals(timestampValues, keyThat.timestampValues) &&
Arrays.equals(isNull, keyThat.isNull) &&
byteValues.length == keyThat.byteValues.length &&
(0 == byteValues.length || bytesEquals(keyThat));
@@ -196,6 +211,16 @@ public class VectorHashKeyWrapper extends KeyWrapper {
clone.byteStarts = EMPTY_INT_ARRAY;
clone.byteLengths = EMPTY_INT_ARRAY;
}
+ if (timestampValues.length > 0) {
+ clone.timestampValues = new PisaTimestamp[timestampValues.length];
+ for(int i = 0; i < timestampValues.length; ++i) {
+ clone.timestampValues[i] = new PisaTimestamp();
+ clone.timestampValues[i].update(timestampValues[i]);
+ }
+ } else {
+ clone.timestampValues = EMPTY_TIMESTAMP_ARRAY;
+ }
+
clone.hashcode = hashcode;
assert clone.equals(this);
}
@@ -256,14 +281,32 @@ public class VectorHashKeyWrapper extends KeyWrapper {
isNull[longValues.length + doubleValues.length + byteValues.length + index] = true;
}
+ public void assignTimestamp(int index, PisaTimestamp value) {
+ timestampValues[index].update(value);
+ isNull[longValues.length + doubleValues.length + byteValues.length +
+ decimalValues.length + index] = false;
+ }
+
+ public void assignTimestamp(int index, TimestampColumnVector colVector, int elementNum) {
+ colVector.pisaTimestampUpdate(timestampValues[index], elementNum);
+ isNull[longValues.length + doubleValues.length + byteValues.length +
+ decimalValues.length + index] = false;
+ }
+
+ public void assignNullTimestamp(int index) {
+ isNull[longValues.length + doubleValues.length + byteValues.length +
+ decimalValues.length + index] = true;
+ }
+
@Override
public String toString()
{
- return String.format("%d[%s] %d[%s] %d[%s] %d[%s]",
+ return String.format("%d[%s] %d[%s] %d[%s] %d[%s] %d[%s]",
longValues.length, Arrays.toString(longValues),
doubleValues.length, Arrays.toString(doubleValues),
byteValues.length, Arrays.toString(byteValues),
- decimalValues.length, Arrays.toString(decimalValues));
+ decimalValues.length, Arrays.toString(decimalValues),
+ timestampValues.length, Arrays.toString(timestampValues));
}
public boolean getIsLongNull(int i) {
@@ -315,5 +358,15 @@ public class VectorHashKeyWrapper extends KeyWrapper {
public HiveDecimalWritable getDecimal(int i) {
return decimalValues[i];
}
+
+ public boolean getIsTimestampNull(int i) {
+ return isNull[longValues.length + doubleValues.length + byteValues.length +
+ decimalValues.length + i];
+ }
+
+ public PisaTimestamp getTimestamp(int i) {
+ return timestampValues[i];
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
index 6333222..1c34124 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
@@ -18,13 +18,11 @@
package org.apache.hadoop.hive.ql.exec.vector;
-import java.util.Arrays;
-
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
/**
* Class for handling vectorized hash map key wrappers. It evaluates the key columns in a
@@ -157,27 +155,49 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
}
}
for(int i=0;i<decimalIndices.length; ++i) {
- int keyIndex = decimalIndices[i];
- int columnIndex = keyExpressions[keyIndex].getOutputColumn();
- DecimalColumnVector columnVector = (DecimalColumnVector) batch.cols[columnIndex];
- if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
- assignDecimalNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
- } else if (columnVector.noNulls && !columnVector.isRepeating && batch.selectedInUse) {
- assignDecimalNoNullsNoRepeatingSelection(i, batch.size, columnVector, batch.selected);
- } else if (columnVector.noNulls && columnVector.isRepeating) {
- assignDecimalNoNullsRepeating(i, batch.size, columnVector);
- } else if (!columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
- assignDecimalNullsNoRepeatingNoSelection(i, batch.size, columnVector);
- } else if (!columnVector.noNulls && columnVector.isRepeating) {
- assignDecimalNullsRepeating(i, batch.size, columnVector);
- } else if (!columnVector.noNulls && !columnVector.isRepeating && batch.selectedInUse) {
- assignDecimalNullsNoRepeatingSelection (i, batch.size, columnVector, batch.selected);
- } else {
- throw new HiveException (String.format(
- "Unimplemented Decimal null/repeat/selected combination %b/%b/%b",
- columnVector.noNulls, columnVector.isRepeating, batch.selectedInUse));
- }
+ int keyIndex = decimalIndices[i];
+ int columnIndex = keyExpressions[keyIndex].getOutputColumn();
+ DecimalColumnVector columnVector = (DecimalColumnVector) batch.cols[columnIndex];
+ if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
+ assignDecimalNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
+ } else if (columnVector.noNulls && !columnVector.isRepeating && batch.selectedInUse) {
+ assignDecimalNoNullsNoRepeatingSelection(i, batch.size, columnVector, batch.selected);
+ } else if (columnVector.noNulls && columnVector.isRepeating) {
+ assignDecimalNoNullsRepeating(i, batch.size, columnVector);
+ } else if (!columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
+ assignDecimalNullsNoRepeatingNoSelection(i, batch.size, columnVector);
+ } else if (!columnVector.noNulls && columnVector.isRepeating) {
+ assignDecimalNullsRepeating(i, batch.size, columnVector);
+ } else if (!columnVector.noNulls && !columnVector.isRepeating && batch.selectedInUse) {
+ assignDecimalNullsNoRepeatingSelection (i, batch.size, columnVector, batch.selected);
+ } else {
+ throw new HiveException (String.format(
+ "Unimplemented Decimal null/repeat/selected combination %b/%b/%b",
+ columnVector.noNulls, columnVector.isRepeating, batch.selectedInUse));
}
+ }
+ for(int i=0;i<timestampIndices.length; ++i) {
+ int keyIndex = timestampIndices[i];
+ int columnIndex = keyExpressions[keyIndex].getOutputColumn();
+ TimestampColumnVector columnVector = (TimestampColumnVector) batch.cols[columnIndex];
+ if (columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
+ assignTimestampNoNullsNoRepeatingNoSelection(i, batch.size, columnVector);
+ } else if (columnVector.noNulls && !columnVector.isRepeating && batch.selectedInUse) {
+ assignTimestampNoNullsNoRepeatingSelection(i, batch.size, columnVector, batch.selected);
+ } else if (columnVector.noNulls && columnVector.isRepeating) {
+ assignTimestampNoNullsRepeating(i, batch.size, columnVector);
+ } else if (!columnVector.noNulls && !columnVector.isRepeating && !batch.selectedInUse) {
+ assignTimestampNullsNoRepeatingNoSelection(i, batch.size, columnVector);
+ } else if (!columnVector.noNulls && columnVector.isRepeating) {
+ assignTimestampNullsRepeating(i, batch.size, columnVector);
+ } else if (!columnVector.noNulls && !columnVector.isRepeating && batch.selectedInUse) {
+ assignTimestampNullsNoRepeatingSelection (i, batch.size, columnVector, batch.selected);
+ } else {
+ throw new HiveException (String.format(
+ "Unimplemented timestamp null/repeat/selected combination %b/%b/%b",
+ columnVector.noNulls, columnVector.isRepeating, batch.selectedInUse));
+ }
+ }
for(int i=0;i<batch.size;++i) {
vectorHashKeyWrappers[i].setHashKey();
}
@@ -504,6 +524,79 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
}
/**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for Timestamp type, possible nulls, no repeat values, batch selection vector.
+ */
+ private void assignTimestampNullsNoRepeatingSelection(int index, int size,
+ TimestampColumnVector columnVector, int[] selected) {
+ for(int i = 0; i < size; ++i) {
+ int row = selected[i];
+ if (!columnVector.isNull[row]) {
+ vectorHashKeyWrappers[i].assignTimestamp(index, columnVector, row);
+ } else {
+ vectorHashKeyWrappers[i].assignNullTimestamp(index);
+ }
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for Timestamp type, repeat null values.
+ */
+ private void assignTimestampNullsRepeating(int index, int size,
+ TimestampColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignNullTimestamp(index);
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for Timestamp type, possible nulls, repeat values.
+ */
+ private void assignTimestampNullsNoRepeatingNoSelection(int index, int size,
+ TimestampColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ if (!columnVector.isNull[r]) {
+ vectorHashKeyWrappers[r].assignTimestamp(index, columnVector, r);
+ } else {
+ vectorHashKeyWrappers[r].assignNullTimestamp(index);
+ }
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for Timestamp type, no nulls, repeat values, no selection vector.
+ */
+ private void assignTimestampNoNullsRepeating(int index, int size, TimestampColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignTimestamp(index, columnVector, 0);
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for Timestamp type, no nulls, no repeat values, batch selection vector.
+ */
+ private void assignTimestampNoNullsNoRepeatingSelection(int index, int size,
+ TimestampColumnVector columnVector, int[] selected) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignTimestamp(index, columnVector, selected[r]);
+ }
+ }
+
+ /**
+ * Helper method to assign values from a vector column into the key wrapper.
+ * Optimized for Timestamp type, no nulls, no repeat values, no selection vector.
+ */
+ private void assignTimestampNoNullsNoRepeatingNoSelection(int index, int size,
+ TimestampColumnVector columnVector) {
+ for(int r = 0; r < size; ++r) {
+ vectorHashKeyWrappers[r].assignTimestamp(index, columnVector, r);
+ }
+ }
+ /**
* Prepares a VectorHashKeyWrapperBatch to work for a specific set of keys.
* Computes the fast access lookup indices, preallocates all needed internal arrays.
* This step is done only once per query, not once per batch. The information computed now
@@ -544,16 +637,17 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
compiledKeyWrapperBatch.keysFixedSize += model.lengthForDoubleArrayOfSize(compiledKeyWrapperBatch.doubleIndices.length);
compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(compiledKeyWrapperBatch.stringIndices.length);
compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(compiledKeyWrapperBatch.decimalIndices.length);
+ compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(compiledKeyWrapperBatch.timestampIndices.length);
compiledKeyWrapperBatch.keysFixedSize += model.lengthForIntArrayOfSize(compiledKeyWrapperBatch.longIndices.length) * 2;
compiledKeyWrapperBatch.keysFixedSize +=
model.lengthForBooleanArrayOfSize(keyExpressions.length);
return compiledKeyWrapperBatch;
}
-
+
public VectorHashKeyWrapper allocateKeyWrapper() {
return new VectorHashKeyWrapper(longIndices.length, doubleIndices.length,
- stringIndices.length, decimalIndices.length);
+ stringIndices.length, decimalIndices.length, timestampIndices.length);
}
/**
@@ -581,11 +675,16 @@ public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
return kw.getIsDecimalNull(klh.decimalIndex)? null :
keyOutputWriter.writeValue(
kw.getDecimal(klh.decimalIndex).getHiveDecimal());
+ } else if (klh.timestampIndex >= 0) {
+ return kw.getIsTimestampNull(klh.timestampIndex)? null :
+ keyOutputWriter.writeValue(
+ kw.getTimestamp(klh.timestampIndex));
}
else {
throw new HiveException(String.format(
- "Internal inconsistent KeyLookupHelper at index [%d]:%d %d %d %d",
- i, klh.longIndex, klh.doubleIndex, klh.stringIndex, klh.decimalIndex));
+ "Internal inconsistent KeyLookupHelper at index [%d]:%d %d %d %d %d",
+ i, klh.longIndex, klh.doubleIndex, klh.stringIndex, klh.decimalIndex,
+ klh.timestampIndex));
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
index c98c260..dea38e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.sql.Timestamp;
import java.util.List;
+import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
+import org.apache.hadoop.hive.common.type.PisaTimestamp;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -254,7 +256,7 @@ public final class VectorSerializeRow<T extends SerializeWrite> {
}
}
- private class TimestampWriter extends AbstractLongWriter {
+ private class TimestampWriter extends Writer {
Timestamp scratchTimestamp;
@@ -265,11 +267,11 @@ public final class VectorSerializeRow<T extends SerializeWrite> {
@Override
boolean apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
- LongColumnVector colVector = (LongColumnVector) batch.cols[columnIndex];
+ TimestampColumnVector colVector = (TimestampColumnVector) batch.cols[columnIndex];
if (colVector.isRepeating) {
if (colVector.noNulls || !colVector.isNull[0]) {
- TimestampUtils.assignTimeInNanoSec(colVector.vector[0], scratchTimestamp);
+ colVector.timestampUpdate(scratchTimestamp, 0);
serializeWrite.writeTimestamp(scratchTimestamp);
return true;
} else {
@@ -278,7 +280,7 @@ public final class VectorSerializeRow<T extends SerializeWrite> {
}
} else {
if (colVector.noNulls || !colVector.isNull[batchIndex]) {
- TimestampUtils.assignTimeInNanoSec(colVector.vector[batchIndex], scratchTimestamp);
+ colVector.timestampUpdate(scratchTimestamp, batchIndex);
serializeWrite.writeTimestamp(scratchTimestamp);
return true;
} else {
@@ -319,19 +321,23 @@ public final class VectorSerializeRow<T extends SerializeWrite> {
}
}
- private class IntervalDayTimeWriter extends AbstractLongWriter {
+ private class IntervalDayTimeWriter extends Writer {
+
+ private HiveIntervalDayTime hiveIntervalDayTime;
IntervalDayTimeWriter(int columnIndex) {
super(columnIndex);
+ hiveIntervalDayTime = new HiveIntervalDayTime();
}
@Override
boolean apply(VectorizedRowBatch batch, int batchIndex) throws IOException {
- LongColumnVector colVector = (LongColumnVector) batch.cols[columnIndex];
+ TimestampColumnVector colVector = (TimestampColumnVector) batch.cols[columnIndex];
if (colVector.isRepeating) {
if (colVector.noNulls || !colVector.isNull[0]) {
- serializeWrite.writeHiveIntervalDayTime(colVector.vector[0]);
+ hiveIntervalDayTime.set(colVector.asScratchPisaTimestamp(0));
+ serializeWrite.writeHiveIntervalDayTime(hiveIntervalDayTime);
return true;
} else {
serializeWrite.writeNull();
@@ -339,7 +345,8 @@ public final class VectorSerializeRow<T extends SerializeWrite> {
}
} else {
if (colVector.noNulls || !colVector.isNull[batchIndex]) {
- serializeWrite.writeHiveIntervalDayTime(colVector.vector[batchIndex]);
+ hiveIntervalDayTime.set(colVector.asScratchPisaTimestamp(batchIndex));
+ serializeWrite.writeHiveIntervalDayTime(hiveIntervalDayTime);
return true;
} else {
serializeWrite.writeNull();
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
index 7e95244..dd59bf2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
@@ -50,22 +50,30 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.InputExpressionType;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Mode;
import org.apache.hadoop.hive.ql.exec.vector.expressions.*;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.CastTimestampToDouble;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgTimestamp;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountMerge;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCountStar;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFStdPopTimestamp;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFStdSampTimestamp;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFSumDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFVarPopTimestamp;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFVarSampTimestamp;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFAvgDouble;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFAvgLong;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxDecimal;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxDouble;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxLong;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxString;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMaxTimestamp;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinDecimal;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinDouble;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinLong;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinString;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFMinTimestamp;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopDecimal;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopDouble;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.gen.VectorUDAFStdPopLong;
@@ -929,20 +937,16 @@ public class VectorizationContext {
case DATE:
return new ConstantVectorExpression(outCol, DateWritable.dateToDays((Date) constantValue));
case TIMESTAMP:
- return new ConstantVectorExpression(outCol, TimestampUtils.getTimeNanoSec((Timestamp) constantValue));
+ return new ConstantVectorExpression(outCol, (Timestamp) constantValue);
case INTERVAL_YEAR_MONTH:
return new ConstantVectorExpression(outCol,
((HiveIntervalYearMonth) constantValue).getTotalMonths());
case INTERVAL_DAY_TIME:
- return new ConstantVectorExpression(outCol,
- DateUtils.getIntervalDayTimeTotalNanos((HiveIntervalDayTime) constantValue));
+ return new ConstantVectorExpression(outCol, (HiveIntervalDayTime) constantValue);
case FLOAT_FAMILY:
return new ConstantVectorExpression(outCol, ((Number) constantValue).doubleValue());
case DECIMAL:
- VectorExpression ve = new ConstantVectorExpression(outCol, (HiveDecimal) constantValue);
- // Set type name with decimal precision, scale, etc.
- ve.setOutputType(typeName);
- return ve;
+ return new ConstantVectorExpression(outCol, (HiveDecimal) constantValue, typeName);
case STRING:
return new ConstantVectorExpression(outCol, ((String) constantValue).getBytes());
case CHAR:
@@ -1240,8 +1244,8 @@ public class VectorizationContext {
VectorExpression ve = getVectorExpressionForUdf(udf, udf.getClass(), childExpr, mode, returnType);
// Replace with the milliseconds conversion
- if (!udf.isIntToTimestampInSeconds() && ve instanceof CastLongToTimestampViaLongToLong) {
- ve = createVectorExpression(CastMillisecondsLongToTimestampViaLongToLong.class,
+ if (!udf.isIntToTimestampInSeconds() && ve instanceof CastLongToTimestamp) {
+ ve = createVectorExpression(CastMillisecondsLongToTimestamp.class,
childExpr, Mode.PROJECTION, returnType);
}
@@ -1526,13 +1530,13 @@ public class VectorizationContext {
expr = createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION, returnType);
((ILongInExpr) expr).setInListValues(inVals);
} else if (isTimestampFamily(colType)) {
- cl = (mode == Mode.FILTER ? FilterLongColumnInList.class : LongColumnInList.class);
- long[] inVals = new long[childrenForInList.size()];
+ cl = (mode == Mode.FILTER ? FilterTimestampColumnInList.class : TimestampColumnInList.class);
+ Timestamp[] inVals = new Timestamp[childrenForInList.size()];
for (int i = 0; i != inVals.length; i++) {
inVals[i] = getTimestampScalar(childrenForInList.get(i));
}
expr = createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION, returnType);
- ((ILongInExpr) expr).setInListValues(inVals);
+ ((ITimestampInExpr) expr).setInListValues(inVals);
} else if (isStringFamily(colType)) {
cl = (mode == Mode.FILTER ? FilterStringColumnInList.class : StringColumnInList.class);
byte[][] inVals = new byte[childrenForInList.size()][];
@@ -1834,7 +1838,7 @@ public class VectorizationContext {
if (isIntFamily(inputType)) {
return createVectorExpression(CastLongToDouble.class, childExpr, Mode.PROJECTION, returnType);
} else if (inputType.equals("timestamp")) {
- return createVectorExpression(CastTimestampToDoubleViaLongToDouble.class, childExpr, Mode.PROJECTION,
+ return createVectorExpression(CastTimestampToDouble.class, childExpr, Mode.PROJECTION,
returnType);
} else if (isFloatFamily(inputType)) {
@@ -1978,20 +1982,10 @@ public class VectorizationContext {
cl = FilterCharColumnBetween.class;
} else if (charTypePattern.matcher(colType).matches() && notKeywordPresent) {
cl = FilterCharColumnNotBetween.class;
- } else if (colType.equals("timestamp")) {
-
- // Get timestamp boundary values as longs instead of the expected strings
- long left = getTimestampScalar(childExpr.get(2));
- long right = getTimestampScalar(childExpr.get(3));
- childrenAfterNot = new ArrayList<ExprNodeDesc>();
- childrenAfterNot.add(colExpr);
- childrenAfterNot.add(new ExprNodeConstantDesc(left));
- childrenAfterNot.add(new ExprNodeConstantDesc(right));
- if (notKeywordPresent) {
- cl = FilterLongColumnNotBetween.class;
- } else {
- cl = FilterLongColumnBetween.class;
- }
+ } else if (colType.equals("timestamp") && !notKeywordPresent) {
+ cl = FilterTimestampColumnBetween.class;
+ } else if (colType.equals("timestamp") && notKeywordPresent) {
+ cl = FilterTimestampColumnNotBetween.class;
} else if (isDecimalFamily(colType) && !notKeywordPresent) {
cl = FilterDecimalColumnBetween.class;
} else if (isDecimalFamily(colType) && notKeywordPresent) {
@@ -2056,6 +2050,7 @@ public class VectorizationContext {
// Make vectorized operator
String normalizedName = getNormalizedName(resultTypeName);
+
VectorExpression ve = new VectorUDFAdaptor(expr, outputCol, normalizedName, argDescs);
// Set child expressions
@@ -2173,21 +2168,17 @@ public class VectorizationContext {
VectorExpression.Type type = VectorExpression.Type.getValue(t);
Object scalarValue = getScalarValue(constDesc);
switch (type) {
- case TIMESTAMP:
- return TimestampUtils.getTimeNanoSec((Timestamp) scalarValue);
case DATE:
return DateWritable.dateToDays((Date) scalarValue);
case INTERVAL_YEAR_MONTH:
return ((HiveIntervalYearMonth) scalarValue).getTotalMonths();
- case INTERVAL_DAY_TIME:
- return DateUtils.getIntervalDayTimeTotalNanos((HiveIntervalDayTime) scalarValue);
default:
return scalarValue;
}
}
- // Get a timestamp as a long in number of nanos, from a string constant or cast
- private long getTimestampScalar(ExprNodeDesc expr) throws HiveException {
+ // Get a timestamp from a string constant or cast
+ private Timestamp getTimestampScalar(ExprNodeDesc expr) throws HiveException {
if (expr instanceof ExprNodeGenericFuncDesc &&
((ExprNodeGenericFuncDesc) expr).getGenericUDF() instanceof GenericUDFTimestamp) {
return evaluateCastToTimestamp(expr);
@@ -2215,7 +2206,7 @@ public class VectorizationContext {
+ "Expecting string.");
}
- private long evaluateCastToTimestamp(ExprNodeDesc expr) throws HiveException {
+ private Timestamp evaluateCastToTimestamp(ExprNodeDesc expr) throws HiveException {
ExprNodeGenericFuncDesc expr2 = (ExprNodeGenericFuncDesc) expr;
ExprNodeEvaluator evaluator = ExprNodeEvaluatorFactory.get(expr2);
ObjectInspector output = evaluator.initialize(null);
@@ -2226,7 +2217,7 @@ public class VectorizationContext {
throw new HiveException("Udf: failed to convert to timestamp");
}
Timestamp ts = (Timestamp) java;
- return TimestampUtils.getTimeNanoSec(ts);
+ return ts;
}
private Constructor<?> getConstructor(Class<?> cl) throws HiveException {
@@ -2315,7 +2306,7 @@ public class VectorizationContext {
}
}
- public static ColumnVector.Type getColumnVectorTypeFromTypeInfo(TypeInfo typeInfo) throws HiveException {
+ public static ColumnVector.Type getColumnVectorTypeFromTypeInfo(TypeInfo typeInfo) {
switch (typeInfo.getCategory()) {
case STRUCT:
return Type.STRUCT;
@@ -2336,11 +2327,13 @@ public class VectorizationContext {
case INT:
case LONG:
case DATE:
- case TIMESTAMP:
case INTERVAL_YEAR_MONTH:
- case INTERVAL_DAY_TIME:
return ColumnVector.Type.LONG;
+ case INTERVAL_DAY_TIME:
+ case TIMESTAMP:
+ return ColumnVector.Type.TIMESTAMP;
+
case FLOAT:
case DOUBLE:
return ColumnVector.Type.DOUBLE;
@@ -2369,47 +2362,58 @@ public class VectorizationContext {
// TODO: And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used.. Right now they are conservatively
// marked map-side (HASH).
static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_INTERVAL_FAMILY, null, VectorUDAFMinLong.class));
+ add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_DATE_INTERVAL_YEAR_MONTH, null, VectorUDAFMinLong.class));
add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class));
add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class));
add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_INTERVAL_FAMILY, null, VectorUDAFMaxLong.class));
+ add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.TIMESTAMP_INTERVAL_DAY_TIME, null, VectorUDAFMinTimestamp.class));
+ add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_DATE_INTERVAL_YEAR_MONTH, null, VectorUDAFMaxLong.class));
add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class));
add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class));
add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class));
+ add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.TIMESTAMP_INTERVAL_DAY_TIME, null, VectorUDAFMaxTimestamp.class));
add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_INTERVAL_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_DATE_INTERVAL_YEAR_MONTH, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class));
add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.TIMESTAMP_INTERVAL_DAY_TIME, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class));
add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class));
add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class));
- add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class));
+ add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class));
add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class));
add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class));
- add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
- add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
+ add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFAvgTimestamp.class));
+ add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
+ add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class));
add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class));
add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class));
add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class));
- add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class));
+ add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFVarPopTimestamp.class));
+ add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFVarPopTimestamp.class));
+ add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class));
add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class));
add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class));
- add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
- add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
- add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFVarSampTimestamp.class));
+ add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
- add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class));
+ add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFStdPopTimestamp.class));
+ add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFStdPopTimestamp.class));
+ add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFStdPopTimestamp.class));
+ add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class));
add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class));
add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class));
+ add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.TIMESTAMP, GroupByDesc.Mode.HASH, VectorUDAFStdSampTimestamp.class));
}};
public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduceMergePartial)
http://git-wip-us.apache.org/repos/asf/hive/blob/4a479d0c/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
index 9b90f37..a68d0cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
@@ -141,11 +141,12 @@ public class VectorizedBatchUtil {
case SHORT:
case INT:
case LONG:
- case TIMESTAMP:
case DATE:
case INTERVAL_YEAR_MONTH:
- case INTERVAL_DAY_TIME:
return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+ case INTERVAL_DAY_TIME:
+ case TIMESTAMP:
+ return new TimestampColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
case FLOAT:
case DOUBLE:
return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
@@ -393,13 +394,12 @@ public class VectorizedBatchUtil {
}
break;
case TIMESTAMP: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[offset + colIndex];
+ TimestampColumnVector lcv = (TimestampColumnVector) batch.cols[offset + colIndex];
if (writableCol != null) {
- Timestamp t = ((TimestampWritable) writableCol).getTimestamp();
- lcv.vector[rowIndex] = TimestampUtils.getTimeNanoSec(t);
+ lcv.set(rowIndex, ((TimestampWritable) writableCol).getTimestamp());
lcv.isNull[rowIndex] = false;
} else {
- lcv.vector[rowIndex] = 1;
+ lcv.setNullValue(rowIndex);
setNullColIsNullValue(lcv, rowIndex);
}
}
@@ -583,6 +583,8 @@ public class VectorizedBatchUtil {
return new DecimalColumnVector(decColVector.vector.length,
decColVector.precision,
decColVector.scale);
+ } else if (source instanceof TimestampColumnVector) {
+ return new TimestampColumnVector(((TimestampColumnVector) source).getLength());
} else if (source instanceof ListColumnVector) {
ListColumnVector src = (ListColumnVector) source;
ColumnVector child = cloneColumnVector(src.child);
@@ -682,6 +684,10 @@ public class VectorizedBatchUtil {
}
} else if (colVector instanceof DecimalColumnVector) {
sb.append(((DecimalColumnVector) colVector).vector[index].toString());
+ } else if (colVector instanceof TimestampColumnVector) {
+ Timestamp timestamp = new Timestamp(0);
+ ((TimestampColumnVector) colVector).timestampUpdate(timestamp, index);
+ sb.append(timestamp.toString());
} else {
sb.append("Unknown");
}