You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/07 21:41:45 UTC
svn commit: r1637444 [6/20] - in /hive/branches/spark: ./
cli/src/test/org/apache/hadoop/hive/cli/ common/
common/src/java/org/apache/hadoop/hive/common/type/
common/src/java/org/apache/hadoop/hive/conf/
common/src/test/org/apache/hadoop/hive/conf/ com...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java Fri Nov 7 20:41:34 2014
@@ -21,9 +21,9 @@ package org.apache.hadoop.hive.ql.exec.v
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.hive.common.type.Decimal128;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.DecimalUtil;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
@@ -41,7 +41,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hive.common.util.Decimal128FastBuffer;
/**
* Generated from template VectorUDAFAvg.txt.
@@ -57,24 +56,45 @@ public class VectorUDAFAvgDecimal extend
private static final long serialVersionUID = 1L;
- transient private final Decimal128 sum = new Decimal128();
+ transient private final HiveDecimalWritable sum = new HiveDecimalWritable();
transient private long count;
transient private boolean isNull;
- public void sumValueWithCheck(Decimal128 value, short scale) {
+ // We use this to catch overflow.
+ transient private boolean isOutOfRange;
+
+ public void sumValueWithNullCheck(HiveDecimalWritable writable, short scale) {
+ if (isOutOfRange) {
+ return;
+ }
+ HiveDecimal value = writable.getHiveDecimal();
if (isNull) {
- sum.update(value);
- sum.changeScaleDestructive(scale);
+ sum.set(value);
count = 1;
isNull = false;
} else {
- sum.addDestructive(value, scale);
+ HiveDecimal result;
+ try {
+ result = sum.getHiveDecimal().add(value);
+ } catch (ArithmeticException e) { // catch on overflow
+ isOutOfRange = true;
+ return;
+ }
+ sum.set(result);
count++;
}
}
- public void sumValueNoCheck(Decimal128 value, short scale) {
- sum.addDestructive(value, scale);
+ public void sumValueNoNullCheck(HiveDecimalWritable writable, short scale) {
+ HiveDecimal value = writable.getHiveDecimal();
+ HiveDecimal result;
+ try {
+ result = sum.getHiveDecimal().add(value);
+ } catch (ArithmeticException e) { // catch on overflow
+ isOutOfRange = true;
+ return;
+ }
+ sum.set(result);
count++;
}
@@ -87,7 +107,8 @@ public class VectorUDAFAvgDecimal extend
@Override
public void reset() {
isNull = true;
- sum.zeroClear();
+ isOutOfRange = false;
+ sum.set(HiveDecimal.ZERO);
count = 0L;
}
}
@@ -98,8 +119,6 @@ public class VectorUDAFAvgDecimal extend
transient private HiveDecimalWritable resultSum;
transient private StructObjectInspector soi;
- transient private final Decimal128FastBuffer scratch;
-
/**
* The scale of the SUM in the partial output
*/
@@ -120,12 +139,6 @@ public class VectorUDAFAvgDecimal extend
*/
private short inputPrecision;
- /**
- * A value used as scratch to avoid allocating at runtime.
- * Needed by computations like vector[0] * batchSize
- */
- transient private Decimal128 scratchDecimal = new Decimal128();
-
public VectorUDAFAvgDecimal(VectorExpression inputExpression) {
this();
this.inputExpression = inputExpression;
@@ -138,7 +151,6 @@ public class VectorUDAFAvgDecimal extend
resultSum = new HiveDecimalWritable();
partialResult[0] = resultCount;
partialResult[1] = resultSum;
- scratch = new Decimal128FastBuffer();
}
@@ -185,7 +197,7 @@ public class VectorUDAFAvgDecimal extend
DecimalColumnVector inputVector = ( DecimalColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- Decimal128[] vector = inputVector.vector;
+ HiveDecimalWritable[] vector = inputVector.vector;
if (inputVector.noNulls) {
if (inputVector.isRepeating) {
@@ -231,7 +243,7 @@ public class VectorUDAFAvgDecimal extend
private void iterateNoNullsRepeatingWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int bufferIndex,
- Decimal128 value,
+ HiveDecimalWritable value,
int batchSize) {
for (int i=0; i < batchSize; ++i) {
@@ -239,14 +251,14 @@ public class VectorUDAFAvgDecimal extend
aggregationBufferSets,
bufferIndex,
i);
- myagg.sumValueWithCheck(value, this.sumScale);
+ myagg.sumValueWithNullCheck(value, this.sumScale);
}
}
private void iterateNoNullsSelectionWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int bufferIndex,
- Decimal128[] values,
+ HiveDecimalWritable[] values,
int[] selection,
int batchSize) {
@@ -255,28 +267,28 @@ public class VectorUDAFAvgDecimal extend
aggregationBufferSets,
bufferIndex,
i);
- myagg.sumValueWithCheck(values[selection[i]], this.sumScale);
+ myagg.sumValueWithNullCheck(values[selection[i]], this.sumScale);
}
}
private void iterateNoNullsWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int bufferIndex,
- Decimal128[] values,
+ HiveDecimalWritable[] values,
int batchSize) {
for (int i=0; i < batchSize; ++i) {
Aggregation myagg = getCurrentAggregationBuffer(
aggregationBufferSets,
bufferIndex,
i);
- myagg.sumValueWithCheck(values[i], this.sumScale);
+ myagg.sumValueWithNullCheck(values[i], this.sumScale);
}
}
private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int bufferIndex,
- Decimal128 value,
+ HiveDecimalWritable value,
int batchSize,
int[] selection,
boolean[] isNull) {
@@ -287,7 +299,7 @@ public class VectorUDAFAvgDecimal extend
aggregationBufferSets,
bufferIndex,
i);
- myagg.sumValueWithCheck(value, this.sumScale);
+ myagg.sumValueWithNullCheck(value, this.sumScale);
}
}
@@ -296,7 +308,7 @@ public class VectorUDAFAvgDecimal extend
private void iterateHasNullsRepeatingWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int bufferIndex,
- Decimal128 value,
+ HiveDecimalWritable value,
int batchSize,
boolean[] isNull) {
@@ -306,7 +318,7 @@ public class VectorUDAFAvgDecimal extend
aggregationBufferSets,
bufferIndex,
i);
- myagg.sumValueWithCheck(value, this.sumScale);
+ myagg.sumValueWithNullCheck(value, this.sumScale);
}
}
}
@@ -314,7 +326,7 @@ public class VectorUDAFAvgDecimal extend
private void iterateHasNullsSelectionWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int bufferIndex,
- Decimal128[] values,
+ HiveDecimalWritable[] values,
int batchSize,
int[] selection,
boolean[] isNull) {
@@ -326,7 +338,7 @@ public class VectorUDAFAvgDecimal extend
aggregationBufferSets,
bufferIndex,
j);
- myagg.sumValueWithCheck(values[i], this.sumScale);
+ myagg.sumValueWithNullCheck(values[i], this.sumScale);
}
}
}
@@ -334,7 +346,7 @@ public class VectorUDAFAvgDecimal extend
private void iterateHasNullsWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int bufferIndex,
- Decimal128[] values,
+ HiveDecimalWritable[] values,
int batchSize,
boolean[] isNull) {
@@ -344,7 +356,7 @@ public class VectorUDAFAvgDecimal extend
aggregationBufferSets,
bufferIndex,
i);
- myagg.sumValueWithCheck(values[i], this.sumScale);
+ myagg.sumValueWithNullCheck(values[i], this.sumScale);
}
}
}
@@ -367,18 +379,31 @@ public class VectorUDAFAvgDecimal extend
Aggregation myagg = (Aggregation)agg;
- Decimal128[] vector = inputVector.vector;
+ HiveDecimalWritable[] vector = inputVector.vector;
if (inputVector.isRepeating) {
if (inputVector.noNulls) {
if (myagg.isNull) {
myagg.isNull = false;
- myagg.sum.zeroClear();
+ myagg.sum.set(HiveDecimal.ZERO);
myagg.count = 0;
}
- scratchDecimal.update(batchSize);
- scratchDecimal.multiplyDestructive(vector[0], vector[0].getScale());
- myagg.sum.update(scratchDecimal);
+ HiveDecimal value = vector[0].getHiveDecimal();
+ HiveDecimal multiple;
+ try {
+ multiple = value.multiply(HiveDecimal.create(batchSize));
+ } catch (ArithmeticException e) { // catch on overflow
+ myagg.isOutOfRange = true;
+ return;
+ }
+ HiveDecimal result;
+ try {
+ result = myagg.sum.getHiveDecimal().add(multiple);
+ } catch (ArithmeticException e) { // catch on overflow
+ myagg.isOutOfRange = true;
+ return;
+ }
+ myagg.sum.set(result);
myagg.count += batchSize;
}
return;
@@ -400,7 +425,7 @@ public class VectorUDAFAvgDecimal extend
private void iterateSelectionHasNulls(
Aggregation myagg,
- Decimal128[] vector,
+ HiveDecimalWritable[] vector,
int batchSize,
boolean[] isNull,
int[] selected) {
@@ -408,57 +433,57 @@ public class VectorUDAFAvgDecimal extend
for (int j=0; j< batchSize; ++j) {
int i = selected[j];
if (!isNull[i]) {
- Decimal128 value = vector[i];
- myagg.sumValueWithCheck(value, this.sumScale);
+ HiveDecimalWritable value = vector[i];
+ myagg.sumValueWithNullCheck(value, this.sumScale);
}
}
}
private void iterateSelectionNoNulls(
Aggregation myagg,
- Decimal128[] vector,
+ HiveDecimalWritable[] vector,
int batchSize,
int[] selected) {
if (myagg.isNull) {
myagg.isNull = false;
- myagg.sum.zeroClear();
+ myagg.sum.set(HiveDecimal.ZERO);
myagg.count = 0;
}
for (int i=0; i< batchSize; ++i) {
- Decimal128 value = vector[selected[i]];
- myagg.sumValueNoCheck(value, this.sumScale);
+ HiveDecimalWritable value = vector[selected[i]];
+ myagg.sumValueNoNullCheck(value, this.sumScale);
}
}
private void iterateNoSelectionHasNulls(
Aggregation myagg,
- Decimal128[] vector,
+ HiveDecimalWritable[] vector,
int batchSize,
boolean[] isNull) {
for(int i=0;i<batchSize;++i) {
if (!isNull[i]) {
- Decimal128 value = vector[i];
- myagg.sumValueWithCheck(value, this.sumScale);
+ HiveDecimalWritable value = vector[i];
+ myagg.sumValueWithNullCheck(value, this.sumScale);
}
}
}
private void iterateNoSelectionNoNulls(
Aggregation myagg,
- Decimal128[] vector,
+ HiveDecimalWritable[] vector,
int batchSize) {
if (myagg.isNull) {
myagg.isNull = false;
- myagg.sum.zeroClear();
+ myagg.sum.set(HiveDecimal.ZERO);
myagg.count = 0;
}
for (int i=0;i<batchSize;++i) {
- Decimal128 value = vector[i];
- myagg.sumValueNoCheck(value, this.sumScale);
+ HiveDecimalWritable value = vector[i];
+ myagg.sumValueNoNullCheck(value, this.sumScale);
}
}
@@ -477,13 +502,13 @@ public class VectorUDAFAvgDecimal extend
public Object evaluateOutput(
AggregationBuffer agg) throws HiveException {
Aggregation myagg = (Aggregation) agg;
- if (myagg.isNull) {
+ if (myagg.isNull || myagg.isOutOfRange) {
return null;
}
else {
assert(0 < myagg.count);
resultCount.set (myagg.count);
- resultSum.set(HiveDecimal.create(myagg.sum.toBigDecimal()));
+ resultSum.set(myagg.sum.getHiveDecimal());
return partialResult;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java Fri Nov 7 20:41:34 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
-import org.apache.hadoop.hive.common.type.Decimal128;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
@@ -29,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
@@ -48,15 +48,29 @@ public class VectorUDAFSumDecimal extend
private static final long serialVersionUID = 1L;
- transient private Decimal128 sum = new Decimal128();
+ transient private HiveDecimalWritable sum = new HiveDecimalWritable();
transient private boolean isNull;
- public void sumValue(Decimal128 value, short scale) {
+ // We use this to catch overflow.
+ transient private boolean isOutOfRange;
+
+ public void sumValue(HiveDecimalWritable writable, short scale) {
+ if (isOutOfRange) {
+ return;
+ }
+ HiveDecimal value = writable.getHiveDecimal();
if (isNull) {
- sum.update(value, scale);
+ sum.set(value);
isNull = false;
} else {
- sum.addDestructive(value, scale);
+ HiveDecimal result;
+ try {
+ result = sum.getHiveDecimal().add(value);
+ } catch (ArithmeticException e) { // catch on overflow
+ isOutOfRange = true;
+ return;
+ }
+ sum.set(result);
}
}
@@ -68,12 +82,13 @@ public class VectorUDAFSumDecimal extend
@Override
public void reset() {
isNull = true;
- sum.zeroClear();
+ isOutOfRange = false;
+ sum.set(HiveDecimal.ZERO);
}
}
private VectorExpression inputExpression;
- transient private final Decimal128 scratchDecimal;
+ transient private final HiveDecimalWritable scratchDecimal;
public VectorUDAFSumDecimal(VectorExpression inputExpression) {
this();
@@ -82,7 +97,7 @@ public class VectorUDAFSumDecimal extend
public VectorUDAFSumDecimal() {
super();
- scratchDecimal = new Decimal128();
+ scratchDecimal = new HiveDecimalWritable();
}
private Aggregation getCurrentAggregationBuffer(
@@ -110,7 +125,7 @@ public class VectorUDAFSumDecimal extend
DecimalColumnVector inputVector = (DecimalColumnVector)batch.
cols[this.inputExpression.getOutputColumn()];
- Decimal128[] vector = inputVector.vector;
+ HiveDecimalWritable[] vector = inputVector.vector;
if (inputVector.noNulls) {
if (inputVector.isRepeating) {
@@ -163,7 +178,7 @@ public class VectorUDAFSumDecimal extend
private void iterateNoNullsRepeatingWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int aggregateIndex,
- Decimal128 value,
+ HiveDecimalWritable value,
short scale,
int batchSize) {
@@ -179,7 +194,7 @@ public class VectorUDAFSumDecimal extend
private void iterateNoNullsSelectionWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int aggregateIndex,
- Decimal128[] values,
+ HiveDecimalWritable[] values,
short scale,
int[] selection,
int batchSize) {
@@ -196,7 +211,7 @@ public class VectorUDAFSumDecimal extend
private void iterateNoNullsWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int aggregateIndex,
- Decimal128[] values,
+ HiveDecimalWritable[] values,
short scale,
int batchSize) {
for (int i=0; i < batchSize; ++i) {
@@ -211,7 +226,7 @@ public class VectorUDAFSumDecimal extend
private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int aggregateIndex,
- Decimal128 value,
+ HiveDecimalWritable value,
short scale,
int batchSize,
int[] selection,
@@ -232,7 +247,7 @@ public class VectorUDAFSumDecimal extend
private void iterateHasNullsRepeatingWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int aggregateIndex,
- Decimal128 value,
+ HiveDecimalWritable value,
short scale,
int batchSize,
boolean[] isNull) {
@@ -251,7 +266,7 @@ public class VectorUDAFSumDecimal extend
private void iterateHasNullsSelectionWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int aggregateIndex,
- Decimal128[] values,
+ HiveDecimalWritable[] values,
short scale,
int batchSize,
int[] selection,
@@ -272,7 +287,7 @@ public class VectorUDAFSumDecimal extend
private void iterateHasNullsWithAggregationSelection(
VectorAggregationBufferRow[] aggregationBufferSets,
int aggregateIndex,
- Decimal128[] values,
+ HiveDecimalWritable[] values,
short scale,
int batchSize,
boolean[] isNull) {
@@ -305,18 +320,34 @@ public class VectorUDAFSumDecimal extend
}
Aggregation myagg = (Aggregation)agg;
+ if (myagg.isOutOfRange) {
+ return;
+ }
- Decimal128[] vector = inputVector.vector;
+ HiveDecimalWritable[] vector = inputVector.vector;
if (inputVector.isRepeating) {
if ((inputVector.noNulls) || !inputVector.isNull[0]) {
if (myagg.isNull) {
myagg.isNull = false;
- myagg.sum.zeroClear();
+ myagg.sum.set(HiveDecimal.ZERO);
+ }
+ HiveDecimal value = vector[0].getHiveDecimal();
+ HiveDecimal multiple;
+ try {
+ multiple = value.multiply(HiveDecimal.create(batchSize));
+ } catch (ArithmeticException e) { // catch on overflow
+ myagg.isOutOfRange = true;
+ return;
}
- scratchDecimal.update(batchSize);
- scratchDecimal.multiplyDestructive(vector[0], inputVector.scale);
- myagg.sum.addDestructive(scratchDecimal, inputVector.scale);
+ HiveDecimal result;
+ try {
+ result = myagg.sum.getHiveDecimal().add(multiple);
+ } catch (ArithmeticException e) { // catch on overflow
+ myagg.isOutOfRange = true;
+ return;
+ }
+ myagg.sum.set(result);
}
return;
}
@@ -337,7 +368,7 @@ public class VectorUDAFSumDecimal extend
private void iterateSelectionHasNulls(
Aggregation myagg,
- Decimal128[] vector,
+ HiveDecimalWritable[] vector,
short scale,
int batchSize,
boolean[] isNull,
@@ -346,66 +377,94 @@ public class VectorUDAFSumDecimal extend
for (int j=0; j< batchSize; ++j) {
int i = selected[j];
if (!isNull[i]) {
- Decimal128 value = vector[i];
if (myagg.isNull) {
myagg.isNull = false;
- myagg.sum.zeroClear();
+ myagg.sum.set(HiveDecimal.ZERO);
+ }
+ HiveDecimal value = vector[i].getHiveDecimal();
+ HiveDecimal result;
+ try {
+ result = myagg.sum.getHiveDecimal().add(value);
+ } catch (ArithmeticException e) { // catch on overflow
+ myagg.isOutOfRange = true;
+ return;
}
- myagg.sum.addDestructive(value, scale);
+ myagg.sum.set(result);
}
}
}
private void iterateSelectionNoNulls(
Aggregation myagg,
- Decimal128[] vector,
+ HiveDecimalWritable[] vector,
short scale,
int batchSize,
int[] selected) {
if (myagg.isNull) {
- myagg.sum.zeroClear();
+ myagg.sum.set(HiveDecimal.ZERO);
myagg.isNull = false;
}
for (int i=0; i< batchSize; ++i) {
- Decimal128 value = vector[selected[i]];
- myagg.sum.addDestructive(value, scale);
+ HiveDecimal value = vector[selected[i]].getHiveDecimal();
+ HiveDecimal result;
+ try {
+ result = myagg.sum.getHiveDecimal().add(value);
+ } catch (ArithmeticException e) { // catch on overflow
+ myagg.isOutOfRange = true;
+ return;
+ }
+ myagg.sum.set(result);
}
}
private void iterateNoSelectionHasNulls(
Aggregation myagg,
- Decimal128[] vector,
+ HiveDecimalWritable[] vector,
short scale,
int batchSize,
boolean[] isNull) {
for(int i=0;i<batchSize;++i) {
if (!isNull[i]) {
- Decimal128 value = vector[i];
if (myagg.isNull) {
- myagg.sum.zeroClear();
+ myagg.sum.set(HiveDecimal.ZERO);
myagg.isNull = false;
}
- myagg.sum.addDestructive(value, scale);
+ HiveDecimal value = vector[i].getHiveDecimal();
+ HiveDecimal result;
+ try {
+ result = myagg.sum.getHiveDecimal().add(value);
+ } catch (ArithmeticException e) { // catch on overflow
+ myagg.isOutOfRange = true;
+ return;
+ }
+ myagg.sum.set(result);
}
}
}
private void iterateNoSelectionNoNulls(
Aggregation myagg,
- Decimal128[] vector,
+ HiveDecimalWritable[] vector,
short scale,
int batchSize) {
if (myagg.isNull) {
- myagg.sum.zeroClear();
+ myagg.sum.set(HiveDecimal.ZERO);
myagg.isNull = false;
}
for (int i=0;i<batchSize;++i) {
- Decimal128 value = vector[i];
- myagg.sum.addDestructive(value, scale);
+ HiveDecimal value = vector[i].getHiveDecimal();
+ HiveDecimal result;
+ try {
+ result = myagg.sum.getHiveDecimal().add(value);
+ } catch (ArithmeticException e) { // catch on overflow
+ myagg.isOutOfRange = true;
+ return;
+ }
+ myagg.sum.set(result);
}
}
@@ -423,11 +482,11 @@ public class VectorUDAFSumDecimal extend
@Override
public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
Aggregation myagg = (Aggregation) agg;
- if (myagg.isNull) {
+ if (myagg.isNull || myagg.isOutOfRange) {
return null;
}
else {
- return HiveDecimal.create(myagg.sum.toBigDecimal());
+ return myagg.sum.getHiveDecimal();
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java Fri Nov 7 20:41:34 2014
@@ -329,10 +329,10 @@ public class VectorUDFAdaptor extends Ve
} else if (outputOI instanceof WritableHiveDecimalObjectInspector) {
DecimalColumnVector dcv = (DecimalColumnVector) colVec;
if (value instanceof HiveDecimal) {
- dcv.vector[i].update(((HiveDecimal) value).bigDecimalValue());
+ dcv.set(i, (HiveDecimal) value);
} else {
HiveDecimal hd = ((WritableHiveDecimalObjectInspector) outputOI).getPrimitiveJavaObject(value);
- dcv.vector[i].update(hd.bigDecimalValue());
+ dcv.set(i, hd);
}
} else {
throw new RuntimeException("Unhandled object type " + outputOI.getTypeName());
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Fri Nov 7 20:41:34 2014
@@ -74,7 +74,12 @@ public final class HiveFileFormatUtils {
SequenceFileOutputFormat.class, HiveSequenceFileOutputFormat.class);
}
- static String realoutputFormat;
+ private static ThreadLocal<String> tRealOutputFormat = new ThreadLocal<String>() {
+ @Override
+ protected String initialValue() {
+ return null;
+ }
+ };
@SuppressWarnings("unchecked")
private static Map<Class<? extends OutputFormat>, Class<? extends HiveOutputFormat>>
@@ -105,11 +110,9 @@ public final class HiveFileFormatUtils {
}
Class<? extends HiveOutputFormat> result = outputFormatSubstituteMap
.get(origin);
- //register this output format into the map for the first time
- if ((storagehandlerflag == true) && (result == null)) {
+ if ((storagehandlerflag == true) && (result == null || result == HivePassThroughOutputFormat.class)) {
HiveFileFormatUtils.setRealOutputFormatClassName(origin.getName());
result = HivePassThroughOutputFormat.class;
- HiveFileFormatUtils.registerOutputFormatSubstitute((Class<? extends OutputFormat>) origin,HivePassThroughOutputFormat.class);
}
return result;
}
@@ -120,7 +123,7 @@ public final class HiveFileFormatUtils {
@SuppressWarnings("unchecked")
public static String getRealOutputFormatClassName()
{
- return realoutputFormat;
+ return tRealOutputFormat.get();
}
/**
@@ -129,7 +132,7 @@ public final class HiveFileFormatUtils {
public static void setRealOutputFormatClassName(
String destination) {
if (destination != null){
- realoutputFormat = destination;
+ tRealOutputFormat.set(destination);
}
else {
return;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java Fri Nov 7 20:41:34 2014
@@ -30,14 +30,17 @@ import org.apache.avro.file.CodecFactory
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
@@ -47,7 +50,9 @@ import org.apache.hadoop.util.Progressab
* Write to an Avro file from a Hive process.
*/
public class AvroContainerOutputFormat
- implements HiveOutputFormat<LongWritable, AvroGenericRecordWritable> {
+ implements HiveOutputFormat<WritableComparable, AvroGenericRecordWritable> {
+
+ public static final Log LOG = LogFactory.getLog(AvroContainerOutputFormat.class);
@Override
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
@@ -75,21 +80,62 @@ public class AvroContainerOutputFormat
return new AvroGenericRecordWriter(dfw);
}
- //no records will be emitted from Hive
- @Override
- public RecordWriter<LongWritable, AvroGenericRecordWritable>
- getRecordWriter(FileSystem ignored, JobConf job, String name,
- Progressable progress) {
- return new RecordWriter<LongWritable, AvroGenericRecordWritable>() {
- @Override
- public void write(LongWritable key, AvroGenericRecordWritable value) {
- throw new RuntimeException("Should not be called");
- }
+ class WrapperRecordWriter<K extends Writable,V extends Writable> implements RecordWriter<K, V> {
+ FileSinkOperator.RecordWriter hiveWriter = null;
+ JobConf jobConf;
+ Progressable progressable;
+ String fileName;
+
+ public WrapperRecordWriter(JobConf jobConf, Progressable progressable, String fileName){
+ this.progressable = progressable;
+ this.jobConf = jobConf;
+ this.fileName = fileName;
+ }
+
+ private FileSinkOperator.RecordWriter getHiveWriter() throws IOException {
+ if (this.hiveWriter == null){
+ Properties properties = new Properties();
+ for (AvroSerdeUtils.AvroTableProperties tableProperty : AvroSerdeUtils.AvroTableProperties.values()){
+ String propVal;
+ if((propVal = jobConf.get(tableProperty.getPropName())) != null){
+ properties.put(tableProperty.getPropName(),propVal);
+ }
+ }
+
+ Boolean isCompressed = jobConf.getBoolean("mapreduce.output.fileoutputformat.compress", false);
+ Path path = new Path(this.fileName);
+ if(path.getFileSystem(jobConf).isDirectory(path)){
+ // This path is only potentially encountered during setup
+ // Otherwise, a specific part_xxxx file name is generated and passed in.
+ path = new Path(path,"_dummy");
+ }
- @Override
- public void close(Reporter reporter) {
+ this.hiveWriter = getHiveRecordWriter(jobConf,path,null,isCompressed, properties, progressable);
}
- };
+ return this.hiveWriter;
+ }
+
+ @Override
+ public void write(K key, V value) throws IOException {
+ getHiveWriter().write(value);
+ }
+
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ // Normally, I'd worry about the blanket false being passed in here, and that
+ // it'd need to be integrated into an abort call for an OutputCommitter, but the
+ // underlying recordwriter ignores it and throws it away, so it's irrelevant.
+ getHiveWriter().close(false);
+ }
+
+ }
+
+ //no records will be emitted from Hive
+ @Override
+ public RecordWriter<WritableComparable, AvroGenericRecordWritable>
+ getRecordWriter(FileSystem ignored, JobConf job, String fileName,
+ Progressable progress) throws IOException {
+ return new WrapperRecordWriter<WritableComparable, AvroGenericRecordWritable>(job,progress,fileName);
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Fri Nov 7 20:41:34 2014
@@ -1259,12 +1259,9 @@ class RecordReaderImpl implements Record
if (!result.isNull[0]) {
BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
short scaleInData = (short) scaleStream.next();
- result.vector[0].update(bInt, scaleInData);
-
- // Change the scale to match the schema if the scale in data is different.
- if (scale != scaleInData) {
- result.vector[0].changeScaleDestructive((short) scale);
- }
+ HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
+ dec = HiveDecimalUtils.enforcePrecisionScale(dec, precision, scale);
+ result.set(0, dec);
}
} else {
// result vector has isNull values set, use the same to read scale vector.
@@ -1273,13 +1270,10 @@ class RecordReaderImpl implements Record
for (int i = 0; i < batchSize; i++) {
if (!result.isNull[i]) {
BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
- result.vector[i].update(bInt, (short) scratchScaleVector.vector[i]);
-
- // Change the scale to match the schema if the scale is less than in data.
- // (HIVE-7373) If scale is bigger, then it leaves the original trailing zeros
- if (scale < scratchScaleVector.vector[i]) {
- result.vector[i].changeScaleDestructive((short) scale);
- }
+ short scaleInData = (short) scratchScaleVector.vector[i];
+ HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
+ dec = HiveDecimalUtils.enforcePrecisionScale(dec, precision, scale);
+ result.set(i, dec);
}
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java Fri Nov 7 20:41:34 2014
@@ -18,14 +18,14 @@
package org.apache.hadoop.hive.ql.log;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.session.SessionState;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* PerfLogger.
*
@@ -147,10 +147,37 @@ public class PerfLogger {
}
public Long getStartTime(String method) {
- return startTimes.get(method);
+ long startTime = 0L;
+
+ if (startTimes.containsKey(method)) {
+ startTime = startTimes.get(method);
+ }
+ return startTime;
}
public Long getEndTime(String method) {
- return endTimes.get(method);
+ long endTime = 0L;
+
+ if (endTimes.containsKey(method)) {
+ endTime = endTimes.get(method);
+ }
+ return endTime;
}
+
+ public boolean startTimeHasMethod(String method) {
+ return startTimes.containsKey(method);
+ }
+
+ public boolean endTimeHasMethod(String method) {
+ return endTimes.containsKey(method);
+ }
+
+ public Long getDuration(String method) {
+ long duration = 0;
+ if (startTimes.containsKey(method) && endTimes.containsKey(method)) {
+ duration = endTimes.get(method) - startTimes.get(method);
+ }
+ return duration;
+ }
+
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Fri Nov 7 20:41:34 2014
@@ -18,16 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
@@ -39,7 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Jo
import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
import org.apache.hadoop.hive.ql.exec.LimitOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.PTFOperator;
@@ -76,6 +65,16 @@ import org.apache.hadoop.hive.ql.plan.pt
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
/**
* Factory for generating the different node processors used by ColumnPruner.
*/
@@ -600,8 +599,7 @@ public final class ColumnPrunerProcFacto
// revert output cols of SEL(*) to ExprNodeColumnDesc
String[] tabcol = rr.reverseLookup(col);
ColumnInfo colInfo = rr.get(tabcol[0], tabcol[1]);
- ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo.getType(),
- colInfo.getInternalName(), colInfo.getTabAlias(), colInfo.getIsVirtualCol());
+ ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo);
colList.add(colExpr);
outputColNames.add(col);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java Fri Nov 7 20:41:34 2014
@@ -31,11 +31,13 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.stats.annotation.StatsRulesProcFactory;
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
@@ -82,7 +84,8 @@ public class SetReducerParallelism imple
for (Operator<? extends OperatorDesc> sibling:
sink.getChildOperators().get(0).getParentOperators()) {
if (sibling.getStatistics() != null) {
- numberOfBytes += sibling.getStatistics().getDataSize();
+ numberOfBytes = StatsUtils.safeAdd(
+ numberOfBytes, sibling.getStatistics().getDataSize());
} else {
LOG.warn("No stats available from: "+sibling);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java Fri Nov 7 20:41:34 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.Rul
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -164,12 +165,23 @@ public class SkewJoinOptimizer implement
return null;
}
+ // have to create a QBJoinTree for the cloned join operator
+ QBJoinTree originJoinTree = parseContext.getJoinContext().get(joinOp);
+ QBJoinTree newJoinTree;
+ try {
+ newJoinTree = originJoinTree.clone();
+ } catch (CloneNotSupportedException e) {
+ LOG.debug("QBJoinTree could not be cloned: ", e);
+ return null;
+ }
+
JoinOperator joinOpClone;
if (processSelect) {
joinOpClone = (JoinOperator)(currOpClone.getParentOperators().get(0));
} else {
joinOpClone = (JoinOperator)currOpClone;
}
+ parseContext.getJoinContext().put(joinOpClone, newJoinTree);
List<TableScanOperator> tableScanCloneOpsForJoin =
new ArrayList<TableScanOperator>();
@@ -201,6 +213,7 @@ public class SkewJoinOptimizer implement
}
parseContext.getTopOps().put(newAlias, tso);
+ setUpAlias(originJoinTree, newJoinTree, tabAlias, newAlias, tso);
}
// Now do a union of the select operators: selectOp and selectOpClone
@@ -610,6 +623,48 @@ public class SkewJoinOptimizer implement
}
}
}
+
+ /**
+ * Set alias in the cloned join tree
+ */
+ private static void setUpAlias(QBJoinTree origin, QBJoinTree cloned, String origAlias,
+ String newAlias, Operator<? extends OperatorDesc> topOp) {
+ cloned.getAliasToOpInfo().remove(origAlias);
+ cloned.getAliasToOpInfo().put(newAlias, topOp);
+ if (origin.getLeftAlias().equals(origAlias)) {
+ cloned.setLeftAlias(null);
+ cloned.setLeftAlias(newAlias);
+ }
+ replaceAlias(origin.getLeftAliases(), cloned.getLeftAliases(), origAlias, newAlias);
+ replaceAlias(origin.getRightAliases(), cloned.getRightAliases(), origAlias, newAlias);
+ replaceAlias(origin.getBaseSrc(), cloned.getBaseSrc(), origAlias, newAlias);
+ replaceAlias(origin.getMapAliases(), cloned.getMapAliases(), origAlias, newAlias);
+ replaceAlias(origin.getStreamAliases(), cloned.getStreamAliases(), origAlias, newAlias);
+ }
+
+ private static void replaceAlias(String[] origin, String[] cloned,
+ String alias, String newAlias) {
+ if (origin == null || cloned == null || origin.length != cloned.length) {
+ return;
+ }
+ for (int i = 0; i < origin.length; i++) {
+ if (origin[i].equals(alias)) {
+ cloned[i] = newAlias;
+ }
+ }
+ }
+
+ private static void replaceAlias(List<String> origin, List<String> cloned,
+ String alias, String newAlias) {
+ if (origin == null || cloned == null || origin.size() != cloned.size()) {
+ return;
+ }
+ for (int i = 0; i < origin.size(); i++) {
+ if (origin.get(i).equals(alias)) {
+ cloned.set(i, newAlias);
+ }
+ }
+ }
}
/* (non-Javadoc)
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Fri Nov 7 20:41:34 2014
@@ -18,14 +18,8 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -72,8 +66,14 @@ import org.apache.hadoop.hive.ql.plan.Re
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
/**
* When dynamic partitioning (with or without bucketing and sorting) is enabled, this optimization
@@ -157,7 +157,11 @@ public class SortedDynPartitionOptimizer
// the reduce sink key. Since both key columns are not prefix subset
// ReduceSinkDeDuplication will not merge them together resulting in 2 MR jobs.
// To avoid that we will remove the RS (and EX) inserted by enforce bucketing/sorting.
- removeRSInsertedByEnforceBucketing(fsOp);
+ if (!removeRSInsertedByEnforceBucketing(fsOp)) {
+ LOG.debug("Bailing out of sort dynamic partition optimization as some partition columns " +
+ "got constant folded.");
+ return null;
+ }
// unlink connection between FS and its parent
Operator<? extends OperatorDesc> fsParent = fsOp.getParentOperators().get(0);
@@ -209,8 +213,7 @@ public class SortedDynPartitionOptimizer
ArrayList<ExprNodeDesc> newValueCols = Lists.newArrayList();
Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap();
for (ColumnInfo ci : valColInfo) {
- newValueCols.add(new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(), ci
- .getTabAlias(), ci.isHiddenVirtualCol()));
+ newValueCols.add(new ExprNodeColumnDesc(ci));
colExprMap.put(ci.getInternalName(), newValueCols.get(newValueCols.size() - 1));
}
ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder,
@@ -263,7 +266,7 @@ public class SortedDynPartitionOptimizer
// Remove RS and EX introduced by enforce bucketing/sorting config
// Convert PARENT -> RS -> EX -> FS to PARENT -> FS
- private void removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) {
+ private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) {
HiveConf hconf = parseCtx.getConf();
boolean enforceBucketing = HiveConf.getBoolVar(hconf, ConfVars.HIVEENFORCEBUCKETING);
boolean enforceSorting = HiveConf.getBoolVar(hconf, ConfVars.HIVEENFORCESORTING);
@@ -298,17 +301,27 @@ public class SortedDynPartitionOptimizer
Operator<? extends OperatorDesc> rsGrandChild = rsChild.getChildOperators().get(0);
if (rsChild instanceof ExtractOperator) {
+ // if schema size cannot be matched, then it could be because of constant folding
+ // converting partition column expression to constant expression. The constant
+ // expression will then get pruned by column pruner since it will not reference to
+ // any columns.
+ if (rsParent.getSchema().getSignature().size() !=
+ rsChild.getSchema().getSignature().size()) {
+ return false;
+ }
rsParent.getChildOperators().clear();
rsParent.getChildOperators().add(rsGrandChild);
rsGrandChild.getParentOperators().clear();
rsGrandChild.getParentOperators().add(rsParent);
parseCtx.removeOpParseCtx(rsToRemove);
parseCtx.removeOpParseCtx(rsChild);
- LOG.info("Removed " + rsParent.getOperatorId() + " and " + rsChild.getOperatorId()
+ LOG.info("Removed " + rsToRemove.getOperatorId() + " and " + rsChild.getOperatorId()
+ " as it was introduced by enforce bucketing/sorting.");
}
}
}
+
+ return true;
}
private List<Integer> getPartitionPositions(DynamicPartitionCtx dpCtx, RowSchema schema) {
@@ -476,8 +489,7 @@ public class SortedDynPartitionOptimizer
for (Integer idx : pos) {
ColumnInfo ci = colInfos.get(idx);
- ExprNodeColumnDesc encd = new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(),
- ci.getTabAlias(), ci.isHiddenVirtualCol());
+ ExprNodeColumnDesc encd = new ExprNodeColumnDesc(ci);
cols.add(encd);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java Fri Nov 7 20:41:34 2014
@@ -75,7 +75,7 @@ public class HiveOptiqUtil {
return vCols;
}
- public static boolean validateASTForCBO(ASTNode ast) {
+ public static boolean validateASTForUnsupportedTokens(ASTNode ast) {
String astTree = ast.toStringTree();
// if any of following tokens are present in AST, bail out
String[] tokens = { "TOK_CHARSETLITERAL","TOK_TABLESPLITSAMPLE" };
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java Fri Nov 7 20:41:34 2014
@@ -108,7 +108,7 @@ public class PartitionPruner {
boolean argsPruned = false;
GenericUDF hiveUDF = SqlFunctionConverter.getHiveUDF(call.getOperator(),
- call.getType());
+ call.getType(), call.operands.size());
if (hiveUDF != null &&
!FunctionRegistry.isDeterministic(hiveUDF)) {
return null;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java Fri Nov 7 20:41:34 2014
@@ -89,17 +89,17 @@ public class ExprNodeConverter extends R
ArrayList<ExprNodeDesc> tmpExprArgs = new ArrayList<ExprNodeDesc>();
tmpExprArgs.addAll(args.subList(0, 2));
gfDesc = new ExprNodeGenericFuncDesc(TypeConverter.convert(call.getType()),
- SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType()), tmpExprArgs);
+ SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType(), 2), tmpExprArgs);
for (int i = 2; i < call.operands.size(); i++) {
tmpExprArgs = new ArrayList<ExprNodeDesc>();
tmpExprArgs.add(gfDesc);
tmpExprArgs.add(args.get(i));
gfDesc = new ExprNodeGenericFuncDesc(TypeConverter.convert(call.getType()),
- SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType()), tmpExprArgs);
+ SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType(), 2), tmpExprArgs);
}
} else {
gfDesc = new ExprNodeGenericFuncDesc(TypeConverter.convert(call.getType()),
- SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType()), args);
+ SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType(), args.size()), args);
}
return gfDesc;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java Fri Nov 7 20:41:34 2014
@@ -17,15 +17,6 @@
*/
package org.apache.hadoop.hive.ql.optimizer.optiq.translator;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
@@ -47,6 +38,15 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
/**
* JoinCondTypeCheckProcFactory is used by Optiq planner(CBO) to generate Join Conditions from Join Condition AST.
* Reasons for sub class:
@@ -99,8 +99,7 @@ public class JoinCondTypeCheckProcFactor
if (!qualifiedAccess) {
colInfo = getColInfo(ctx, null, tableOrCol, expr);
// It's a column.
- return new ExprNodeColumnDesc(colInfo.getType(), colInfo.getInternalName(),
- colInfo.getTabAlias(), colInfo.getIsVirtualCol());
+ return new ExprNodeColumnDesc(colInfo);
} else if (hasTableAlias(ctx, tableOrCol, expr)) {
return null;
} else {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java Fri Nov 7 20:41:34 2014
@@ -98,10 +98,19 @@ public class SqlFunctionConverter {
return getOptiqFn(name, optiqArgTypes, retType);
}
- public static GenericUDF getHiveUDF(SqlOperator op, RelDataType dt) {
+ public static GenericUDF getHiveUDF(SqlOperator op, RelDataType dt, int argsLength) {
String name = reverseOperatorMap.get(op);
- if (name == null)
+ if (name == null) {
name = op.getName();
+ }
+ // Make sure we handle unary + and - correctly.
+ if (argsLength == 1) {
+ if (name == "+") {
+ name = FunctionRegistry.UNARY_PLUS_FUNC_NAME;
+ } else if (name == "-") {
+ name = FunctionRegistry.UNARY_MINUS_FUNC_NAME;
+ }
+ }
FunctionInfo hFn = name != null ? FunctionRegistry.getFunctionInfo(name) : null;
if (hFn == null)
hFn = handleExplicitCast(op, dt);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java Fri Nov 7 20:41:34 2014
@@ -187,7 +187,7 @@ public class TypeConverter {
throw new RuntimeException("Unsupported Type : " + type.getTypeName());
}
- return convertedType;
+ return dtFactory.createTypeWithNullability(convertedType, true);
}
public static RelDataType convert(ListTypeInfo lstType,
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Fri Nov 7 20:41:34 2014
@@ -18,13 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer.physical;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -60,6 +53,13 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
/**
* GenMRSkewJoinProcessor.
*
@@ -192,9 +192,7 @@ public final class GenMRSkewJoinProcesso
String newColName = i + "_VALUE_" + k; // any name, it does not matter.
ColumnInfo columnInfo = new ColumnInfo(newColName, type, alias.toString(), false);
columnInfos.add(columnInfo);
- newValueExpr.add(new ExprNodeColumnDesc(
- columnInfo.getType(), columnInfo.getInternalName(),
- columnInfo.getTabAlias(), false));
+ newValueExpr.add(new ExprNodeColumnDesc(columnInfo));
if (!first) {
colNames = colNames + ",";
colTypes = colTypes + ",";
@@ -216,9 +214,7 @@ public final class GenMRSkewJoinProcesso
ColumnInfo columnInfo = new ColumnInfo(joinKeys.get(k), TypeInfoFactory
.getPrimitiveTypeInfo(joinKeyTypes.get(k)), alias.toString(), false);
columnInfos.add(columnInfo);
- newKeyExpr.add(new ExprNodeColumnDesc(
- columnInfo.getType(), columnInfo.getInternalName(),
- columnInfo.getTabAlias(), false));
+ newKeyExpr.add(new ExprNodeColumnDesc(columnInfo));
}
newJoinValues.put(alias, newValueExpr);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Fri Nov 7 20:41:34 2014
@@ -18,13 +18,8 @@
package org.apache.hadoop.hive.ql.optimizer.stats.annotation;
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Re
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -77,8 +73,13 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
public class StatsRulesProcFactory {
@@ -170,7 +171,7 @@ public class StatsRulesProcFactory {
// in case of select(*) the data size does not change
if (!sop.getConf().isSelectStar() && !sop.getConf().isSelStarNoCompute()) {
long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats);
- stats.setDataSize(setMaxIfInvalid(dataSize));
+ stats.setDataSize(dataSize);
}
sop.setStatistics(stats);
@@ -322,8 +323,8 @@ public class StatsRulesProcFactory {
} else if (udf instanceof GenericUDFOPOr) {
// for OR condition independently compute and update stats
for (ExprNodeDesc child : genFunc.getChildren()) {
- newNumRows += evaluateChildExpr(stats, child, aspCtx, neededCols,
- fop);
+ newNumRows = StatsUtils.safeAdd(
+ evaluateChildExpr(stats, child, aspCtx, neededCols, fop), newNumRows);
}
} else if (udf instanceof GenericUDFOPNot) {
newNumRows = evaluateNotExpr(stats, pred, aspCtx, neededCols, fop);
@@ -677,9 +678,9 @@ public class StatsRulesProcFactory {
if (cs != null) {
long ndv = cs.getCountDistint();
if (cs.getNumNulls() > 0) {
- ndv += 1;
+ ndv = StatsUtils.safeAdd(ndv, 1);
}
- ndvProduct *= ndv;
+ ndvProduct = StatsUtils.safeMult(ndvProduct, ndv);
} else {
if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) {
// the column must be an aggregate column inserted by GBY. We
@@ -714,15 +715,16 @@ public class StatsRulesProcFactory {
if (mapSideHashAgg) {
if (containsGroupingSet) {
// Case 4: column stats, hash aggregation, grouping sets
- cardinality = Math.min((parentNumRows * sizeOfGroupingSet) / 2,
- ndvProduct * parallelism * sizeOfGroupingSet);
+ cardinality = Math.min(
+ (StatsUtils.safeMult(parentNumRows, sizeOfGroupingSet)) / 2,
+ StatsUtils.safeMult(StatsUtils.safeMult(ndvProduct, parallelism), sizeOfGroupingSet));
if (isDebugEnabled) {
LOG.debug("[Case 4] STATS-" + gop.toString() + ": cardinality: " + cardinality);
}
} else {
// Case 3: column stats, hash aggregation, NO grouping sets
- cardinality = Math.min(parentNumRows / 2, ndvProduct * parallelism);
+ cardinality = Math.min(parentNumRows / 2, StatsUtils.safeMult(ndvProduct, parallelism));
if (isDebugEnabled) {
LOG.debug("[Case 3] STATS-" + gop.toString() + ": cardinality: " + cardinality);
@@ -731,7 +733,7 @@ public class StatsRulesProcFactory {
} else {
if (containsGroupingSet) {
// Case 6: column stats, NO hash aggregation, grouping sets
- cardinality = parentNumRows * sizeOfGroupingSet;
+ cardinality = StatsUtils.safeMult(parentNumRows, sizeOfGroupingSet);
if (isDebugEnabled) {
LOG.debug("[Case 6] STATS-" + gop.toString() + ": cardinality: " + cardinality);
@@ -758,7 +760,7 @@ public class StatsRulesProcFactory {
if (containsGroupingSet) {
// Case 8: column stats, grouping sets
- cardinality = Math.min(parentNumRows, ndvProduct * sizeOfGroupingSet);
+ cardinality = Math.min(parentNumRows, StatsUtils.safeMult(ndvProduct, sizeOfGroupingSet));
if (isDebugEnabled) {
LOG.debug("[Case 8] STATS-" + gop.toString() + ": cardinality: " + cardinality);
@@ -789,7 +791,7 @@ public class StatsRulesProcFactory {
if (containsGroupingSet) {
// Case 2: NO column stats, NO hash aggregation, grouping sets
- cardinality = parentNumRows * sizeOfGroupingSet;
+ cardinality = StatsUtils.safeMult(parentNumRows, sizeOfGroupingSet);
if (isDebugEnabled) {
LOG.debug("[Case 2] STATS-" + gop.toString() + ": cardinality: " + cardinality);
@@ -828,7 +830,6 @@ public class StatsRulesProcFactory {
// for those newly added columns
if (!colExprMap.containsKey(ci.getInternalName())) {
String colName = ci.getInternalName();
- colName = StatsUtils.stripPrefixFromColumnName(colName);
String tabAlias = ci.getTabAlias();
String colType = ci.getTypeName();
ColStatistics cs = new ColStatistics(tabAlias, colName, colType);
@@ -902,7 +903,7 @@ public class StatsRulesProcFactory {
long avgKeySize = 0;
for (ColStatistics cs : colStats) {
if (cs != null) {
- numEstimatedRows *= cs.getCountDistint();
+ numEstimatedRows = StatsUtils.safeMult(numEstimatedRows, cs.getCountDistint());
avgKeySize += Math.ceil(cs.getAvgColLen());
}
}
@@ -956,7 +957,7 @@ public class StatsRulesProcFactory {
long hashEntrySize = gop.javaHashEntryOverHead + avgKeySize + avgValSize;
// estimated hash table size
- long estHashTableSize = numEstimatedRows * hashEntrySize;
+ long estHashTableSize = StatsUtils.safeMult(numEstimatedRows, hashEntrySize);
if (estHashTableSize < maxMemHashAgg) {
return true;
@@ -1065,7 +1066,7 @@ public class StatsRulesProcFactory {
// detect if there are multiple attributes in join key
ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0);
- List<ExprNodeDesc> keyExprs = rsOp.getConf().getKeyCols();
+ List<String> keyExprs = rsOp.getConf().getOutputKeyColumnNames();
numAttr = keyExprs.size();
// infer PK-FK relationship in single attribute join case
@@ -1077,7 +1078,7 @@ public class StatsRulesProcFactory {
ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos);
Statistics parentStats = parent.getStatistics();
- keyExprs = parent.getConf().getKeyCols();
+ keyExprs = parent.getConf().getOutputKeyColumnNames();
// Parent RS may have column statistics from multiple parents.
// Populate table alias to row count map, this will be used later to
@@ -1096,8 +1097,8 @@ public class StatsRulesProcFactory {
// used to quickly look-up for column statistics of join key.
// TODO: expressions in join condition will be ignored. assign
// internal name for expressions and estimate column statistics for expression.
- List<String> fqCols =
- StatsUtils.getFullQualifedColNameFromExprs(keyExprs, parent.getColumnExprMap());
+ List<String> fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keyExprs,
+ parent.getColumnExprMap());
joinKeys.put(pos, fqCols);
// get column statistics for all output columns
@@ -1119,7 +1120,6 @@ public class StatsRulesProcFactory {
for (int idx = 0; idx < numAttr; idx++) {
for (Integer i : joinKeys.keySet()) {
String col = joinKeys.get(i).get(idx);
- col = StatsUtils.stripPrefixFromColumnName(col);
ColStatistics cs = joinedColStats.get(col);
if (cs != null) {
perAttrDVs.add(cs.getCountDistint());
@@ -1136,13 +1136,12 @@ public class StatsRulesProcFactory {
denom = getEasedOutDenominator(distinctVals);
} else {
for (Long l : distinctVals) {
- denom *= l;
+ denom = StatsUtils.safeMult(denom, l);
}
}
} else {
for (List<String> jkeys : joinKeys.values()) {
for (String jk : jkeys) {
- jk = StatsUtils.stripPrefixFromColumnName(jk);
ColStatistics cs = joinedColStats.get(jk);
if (cs != null) {
distinctVals.add(cs.getCountDistint());
@@ -1166,7 +1165,6 @@ public class StatsRulesProcFactory {
ExprNodeDesc end = colExprMap.get(key);
if (end instanceof ExprNodeColumnDesc) {
String colName = ((ExprNodeColumnDesc) end).getColumn();
- colName = StatsUtils.stripPrefixFromColumnName(colName);
String tabAlias = ((ExprNodeColumnDesc) end).getTabAlias();
String fqColName = StatsUtils.getFullyQualifiedColumnName(tabAlias, colName);
ColStatistics cs = joinedColStats.get(fqColName);
@@ -1214,13 +1212,13 @@ public class StatsRulesProcFactory {
}
long maxDataSize = parentSizes.get(maxRowIdx);
- long newNumRows = (long) (joinFactor * maxRowCount * (numParents - 1));
- long newDataSize = (long) (joinFactor * maxDataSize * (numParents - 1));
+ long newNumRows = StatsUtils.safeMult(StatsUtils.safeMult(maxRowCount, (numParents - 1)), joinFactor);
+ long newDataSize = StatsUtils.safeMult(StatsUtils.safeMult(maxDataSize, (numParents - 1)), joinFactor);
Statistics wcStats = new Statistics();
- wcStats.setNumRows(setMaxIfInvalid(newNumRows));
- wcStats.setDataSize(setMaxIfInvalid(newDataSize));
+ wcStats.setNumRows(newNumRows);
+ wcStats.setDataSize(newDataSize);
jop.setStatistics(wcStats);
-
+
if (isDebugEnabled) {
LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
}
@@ -1339,6 +1337,7 @@ public class StatsRulesProcFactory {
}
}
+ // No need for overflow checks, assume selectivity is always <= 1.0
float selMultiParent = 1.0f;
for(Operator<? extends OperatorDesc> parent : multiParentOp.getParentOperators()) {
// In the above example, TS-1 -> RS-1 and TS-2 -> RS-2 are simple trees
@@ -1369,8 +1368,8 @@ public class StatsRulesProcFactory {
Operator<? extends OperatorDesc> op = ops.get(i);
if (op != null && op instanceof ReduceSinkOperator) {
ReduceSinkOperator rsOp = (ReduceSinkOperator) op;
- List<ExprNodeDesc> keys = rsOp.getConf().getKeyCols();
- List<String> fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys,
+ List<String> keys = rsOp.getConf().getOutputKeyColumnNames();
+ List<String> fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keys,
rsOp.getColumnExprMap());
if (fqCols.size() == 1) {
String joinCol = fqCols.get(0);
@@ -1400,8 +1399,8 @@ public class StatsRulesProcFactory {
Operator<? extends OperatorDesc> op = ops.get(i);
if (op instanceof ReduceSinkOperator) {
ReduceSinkOperator rsOp = (ReduceSinkOperator) op;
- List<ExprNodeDesc> keys = rsOp.getConf().getKeyCols();
- List<String> fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys,
+ List<String> keys = rsOp.getConf().getOutputKeyColumnNames();
+ List<String> fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keys,
rsOp.getColumnExprMap());
if (fqCols.size() == 1) {
String joinCol = fqCols.get(0);
@@ -1441,7 +1440,7 @@ public class StatsRulesProcFactory {
LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows."
+ newNumRows + " rows will be set to Long.MAX_VALUE");
}
- newNumRows = setMaxIfInvalid(newNumRows);
+ newNumRows = StatsUtils.getMaxIfOverflow(newNumRows);
stats.setNumRows(newNumRows);
// scale down/up the column statistics based on the changes in number of
@@ -1472,7 +1471,7 @@ public class StatsRulesProcFactory {
stats.setColumnStats(colStats);
long newDataSize = StatsUtils
.getDataSizeFromColumnStats(newNumRows, colStats);
- stats.setDataSize(setMaxIfInvalid(newDataSize));
+ stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize));
}
private long computeNewRowCount(List<Long> rowCountParents, long denom) {
@@ -1494,7 +1493,7 @@ public class StatsRulesProcFactory {
for (int i = 0; i < rowCountParents.size(); i++) {
if (i != maxIdx) {
- result *= rowCountParents.get(i);
+ result = StatsUtils.safeMult(result, rowCountParents.get(i));
}
}
@@ -1512,7 +1511,6 @@ public class StatsRulesProcFactory {
// find min NDV for joining columns
for (Map.Entry<Integer, List<String>> entry : joinKeys.entrySet()) {
String key = entry.getValue().get(joinColIdx);
- key = StatsUtils.stripPrefixFromColumnName(key);
ColStatistics cs = joinedColStats.get(key);
if (cs != null && cs.getCountDistint() < minNDV) {
minNDV = cs.getCountDistint();
@@ -1523,7 +1521,6 @@ public class StatsRulesProcFactory {
if (minNDV != Long.MAX_VALUE) {
for (Map.Entry<Integer, List<String>> entry : joinKeys.entrySet()) {
String key = entry.getValue().get(joinColIdx);
- key = StatsUtils.stripPrefixFromColumnName(key);
ColStatistics cs = joinedColStats.get(key);
if (cs != null) {
cs.setCountDistint(minNDV);
@@ -1569,7 +1566,7 @@ public class StatsRulesProcFactory {
long denom = 1;
for (int i = 0; i < distinctVals.size(); i++) {
if (i != minIdx) {
- denom *= distinctVals.get(i);
+ denom = StatsUtils.safeMult(denom, distinctVals.get(i));
}
}
return denom;
@@ -1613,12 +1610,13 @@ public class StatsRulesProcFactory {
// in the absence of column statistics, compute data size based on
// based on average row size
Statistics wcStats = parentStats.clone();
+ limit = StatsUtils.getMaxIfOverflow(limit);
if (limit <= parentStats.getNumRows()) {
long numRows = limit;
long avgRowSize = parentStats.getAvgRowSize();
- long dataSize = avgRowSize * limit;
- wcStats.setNumRows(setMaxIfInvalid(numRows));
- wcStats.setDataSize(setMaxIfInvalid(dataSize));
+ long dataSize = StatsUtils.safeMult(avgRowSize, limit);
+ wcStats.setNumRows(numRows);
+ wcStats.setDataSize(dataSize);
}
lop.setStatistics(wcStats);
@@ -1662,26 +1660,26 @@ public class StatsRulesProcFactory {
if (satisfyPrecondition(parentStats)) {
List<ColStatistics> colStats = Lists.newArrayList();
for (String key : outKeyColNames) {
- String prefixedKey = "KEY." + key;
+ String prefixedKey = Utilities.ReduceField.KEY.toString() + "." + key;
ExprNodeDesc end = colExprMap.get(prefixedKey);
if (end != null) {
ColStatistics cs = StatsUtils
.getColStatisticsFromExpression(conf, parentStats, end);
if (cs != null) {
- cs.setColumnName(key);
+ cs.setColumnName(prefixedKey);
colStats.add(cs);
}
}
}
for (String val : outValueColNames) {
- String prefixedVal = "VALUE." + val;
+ String prefixedVal = Utilities.ReduceField.VALUE.toString() + "." + val;
ExprNodeDesc end = colExprMap.get(prefixedVal);
if (end != null) {
ColStatistics cs = StatsUtils
.getColStatisticsFromExpression(conf, parentStats, end);
if (cs != null) {
- cs.setColumnName(val);
+ cs.setColumnName(prefixedVal);
colStats.add(cs);
}
}
@@ -1815,7 +1813,7 @@ public class StatsRulesProcFactory {
+ newNumRows + " rows will be set to Long.MAX_VALUE");
}
- newNumRows = setMaxIfInvalid(newNumRows);
+ newNumRows = StatsUtils.getMaxIfOverflow(newNumRows);
long oldRowCount = stats.getNumRows();
double ratio = (double) newNumRows / (double) oldRowCount;
stats.setNumRows(newNumRows);
@@ -1842,10 +1840,10 @@ public class StatsRulesProcFactory {
}
stats.setColumnStats(colStats);
long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats);
- stats.setDataSize(setMaxIfInvalid(newDataSize));
+ stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize));
} else {
long newDataSize = (long) (ratio * stats.getDataSize());
- stats.setDataSize(setMaxIfInvalid(newDataSize));
+ stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize));
}
}
@@ -1853,14 +1851,4 @@ public class StatsRulesProcFactory {
return stats != null && stats.getBasicStatsState().equals(Statistics.State.COMPLETE)
&& !stats.getColumnStatsState().equals(Statistics.State.NONE);
}
-
- /**
- * negative number of rows or data sizes are invalid. It could be because of
- * long overflow in which case return Long.MAX_VALUE
- * @param val - input value
- * @return Long.MAX_VALUE if val is negative else val
- */
- static long setMaxIfInvalid(long val) {
- return val < 0 ? Long.MAX_VALUE : val;
- }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java Fri Nov 7 20:41:34 2014
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.ExplainSQRewriteTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.plan.ExplainSQRewriteWork;
@@ -55,7 +56,7 @@ public class ExplainSQRewriteSemanticAna
ctx
);
- Task<? extends Serializable> explTask = TaskFactory.get(work, conf);
+ ExplainSQRewriteTask explTask = (ExplainSQRewriteTask) TaskFactory.get(work, conf);
fieldList = explTask.getResultSchema();
rootTasks.add(explTask);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java Fri Nov 7 20:41:34 2014
@@ -106,7 +106,7 @@ public class ExplainSemanticAnalyzer ext
work.setAppendTaskType(
HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES));
- Task<? extends Serializable> explTask = TaskFactory.get(work, conf);
+ ExplainTask explTask = (ExplainTask) TaskFactory.get(work, conf);
fieldList = explTask.getResultSchema();
rootTasks.add(explTask);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java Fri Nov 7 20:41:34 2014
@@ -79,4 +79,7 @@ public class JoinCond {
this.joinType = joinType;
}
+ public void setPreserved(boolean preserved) {
+ this.preserved = preserved;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java Fri Nov 7 20:41:34 2014
@@ -238,6 +238,8 @@ public class LoadSemanticAnalyzer extend
// create final load/move work
+ boolean preservePartitionSpecs = false;
+
Map<String, String> partSpec = ts.getPartSpec();
if (partSpec == null) {
partSpec = new LinkedHashMap<String, String>();
@@ -252,9 +254,14 @@ public class LoadSemanticAnalyzer extend
throw new SemanticException(ErrorMsg.OFFLINE_TABLE_OR_PARTITION.
getMsg(ts.tableName + ":" + part.getName()));
}
- outputs.add(new WriteEntity(part,
- (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE :
- WriteEntity.WriteType.INSERT)));
+ if (isOverWrite){
+ outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT_OVERWRITE));
+ } else {
+ outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT));
+ // If partition already exists and we aren't overwriting it, then respect
+ // its current location info rather than picking it from the parent TableDesc
+ preservePartitionSpecs = true;
+ }
} else {
outputs.add(new WriteEntity(ts.tableHandle,
(isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE :
@@ -269,6 +276,12 @@ public class LoadSemanticAnalyzer extend
LoadTableDesc loadTableWork;
loadTableWork = new LoadTableDesc(new Path(fromURI),
Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
+ if (preservePartitionSpecs){
+ // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but
+ // but preservePartitionSpecs=false(default) here is not sufficient enough
+ // info to set inheritTableSpecs=true
+ loadTableWork.setInheritTableSpecs(false);
+ }
Task<? extends Serializable> childTask = TaskFactory.get(new MoveWork(getInputs(),
getOutputs(), loadTableWork, null, true, isLocal), conf);