You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/07 21:41:45 UTC

svn commit: r1637444 [6/20] - in /hive/branches/spark: ./ cli/src/test/org/apache/hadoop/hive/cli/ common/ common/src/java/org/apache/hadoop/hive/common/type/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/conf/ com...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFAvgDecimal.java Fri Nov  7 20:41:34 2014
@@ -21,9 +21,9 @@ package org.apache.hadoop.hive.ql.exec.v
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.hive.common.type.Decimal128;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.DecimalUtil;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationBufferRow;
@@ -41,7 +41,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hive.common.util.Decimal128FastBuffer;
 
 /**
  * Generated from template VectorUDAFAvg.txt.
@@ -57,24 +56,45 @@ public class VectorUDAFAvgDecimal extend
 
       private static final long serialVersionUID = 1L;
 
-      transient private final Decimal128 sum = new Decimal128();
+      transient private final HiveDecimalWritable sum = new HiveDecimalWritable();
       transient private long count;
       transient private boolean isNull;
 
-      public void sumValueWithCheck(Decimal128 value, short scale) {
+      // We use this to catch overflow.
+      transient private boolean isOutOfRange;
+
+      public void sumValueWithNullCheck(HiveDecimalWritable writable, short scale) {
+        if (isOutOfRange) {
+          return;
+        }
+        HiveDecimal value = writable.getHiveDecimal();
         if (isNull) {
-          sum.update(value);
-          sum.changeScaleDestructive(scale);
+          sum.set(value);
           count = 1;
           isNull = false;
         } else {
-          sum.addDestructive(value, scale);
+          HiveDecimal result;
+          try {
+            result = sum.getHiveDecimal().add(value);
+          } catch (ArithmeticException e) {  // catch on overflow
+            isOutOfRange = true;
+            return;
+          }
+          sum.set(result);
           count++;
         }
       }
 
-      public void sumValueNoCheck(Decimal128 value, short scale) {
-        sum.addDestructive(value, scale);
+      public void sumValueNoNullCheck(HiveDecimalWritable writable, short scale) {
+        HiveDecimal value = writable.getHiveDecimal();
+        HiveDecimal result;
+        try {
+          result = sum.getHiveDecimal().add(value);
+        } catch (ArithmeticException e) {  // catch on overflow
+          isOutOfRange = true;
+          return;
+        }
+        sum.set(result);
         count++;
       }
 
@@ -87,7 +107,8 @@ public class VectorUDAFAvgDecimal extend
       @Override
       public void reset() {
         isNull = true;
-        sum.zeroClear();
+        isOutOfRange = false;
+        sum.set(HiveDecimal.ZERO);
         count = 0L;
       }
     }
@@ -98,8 +119,6 @@ public class VectorUDAFAvgDecimal extend
     transient private HiveDecimalWritable resultSum;
     transient private StructObjectInspector soi;
 
-    transient private final Decimal128FastBuffer scratch;
-
     /**
      * The scale of the SUM in the partial output
      */
@@ -120,12 +139,6 @@ public class VectorUDAFAvgDecimal extend
      */
     private short inputPrecision;
 
-    /**
-     * A value used as scratch to avoid allocating at runtime.
-     * Needed by computations like vector[0] * batchSize
-     */
-    transient private Decimal128 scratchDecimal = new Decimal128();
-
     public VectorUDAFAvgDecimal(VectorExpression inputExpression) {
       this();
       this.inputExpression = inputExpression;
@@ -138,7 +151,6 @@ public class VectorUDAFAvgDecimal extend
       resultSum = new HiveDecimalWritable();
       partialResult[0] = resultCount;
       partialResult[1] = resultSum;
-      scratch = new Decimal128FastBuffer();
 
     }
 
@@ -185,7 +197,7 @@ public class VectorUDAFAvgDecimal extend
 
        DecimalColumnVector inputVector = ( DecimalColumnVector)batch.
         cols[this.inputExpression.getOutputColumn()];
-      Decimal128[] vector = inputVector.vector;
+      HiveDecimalWritable[] vector = inputVector.vector;
 
       if (inputVector.noNulls) {
         if (inputVector.isRepeating) {
@@ -231,7 +243,7 @@ public class VectorUDAFAvgDecimal extend
     private void iterateNoNullsRepeatingWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int bufferIndex,
-      Decimal128 value,
+      HiveDecimalWritable value,
       int batchSize) {
 
       for (int i=0; i < batchSize; ++i) {
@@ -239,14 +251,14 @@ public class VectorUDAFAvgDecimal extend
           aggregationBufferSets,
           bufferIndex,
           i);
-        myagg.sumValueWithCheck(value, this.sumScale);
+        myagg.sumValueWithNullCheck(value, this.sumScale);
       }
     }
 
     private void iterateNoNullsSelectionWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int bufferIndex,
-      Decimal128[] values,
+      HiveDecimalWritable[] values,
       int[] selection,
       int batchSize) {
 
@@ -255,28 +267,28 @@ public class VectorUDAFAvgDecimal extend
           aggregationBufferSets,
           bufferIndex,
           i);
-        myagg.sumValueWithCheck(values[selection[i]], this.sumScale);
+        myagg.sumValueWithNullCheck(values[selection[i]], this.sumScale);
       }
     }
 
     private void iterateNoNullsWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int bufferIndex,
-      Decimal128[] values,
+      HiveDecimalWritable[] values,
       int batchSize) {
       for (int i=0; i < batchSize; ++i) {
         Aggregation myagg = getCurrentAggregationBuffer(
           aggregationBufferSets,
           bufferIndex,
           i);
-        myagg.sumValueWithCheck(values[i], this.sumScale);
+        myagg.sumValueWithNullCheck(values[i], this.sumScale);
       }
     }
 
     private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int bufferIndex,
-      Decimal128 value,
+      HiveDecimalWritable value,
       int batchSize,
       int[] selection,
       boolean[] isNull) {
@@ -287,7 +299,7 @@ public class VectorUDAFAvgDecimal extend
             aggregationBufferSets,
             bufferIndex,
             i);
-          myagg.sumValueWithCheck(value, this.sumScale);
+          myagg.sumValueWithNullCheck(value, this.sumScale);
         }
       }
 
@@ -296,7 +308,7 @@ public class VectorUDAFAvgDecimal extend
     private void iterateHasNullsRepeatingWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int bufferIndex,
-      Decimal128 value,
+      HiveDecimalWritable value,
       int batchSize,
       boolean[] isNull) {
 
@@ -306,7 +318,7 @@ public class VectorUDAFAvgDecimal extend
             aggregationBufferSets,
             bufferIndex,
             i);
-          myagg.sumValueWithCheck(value, this.sumScale);
+          myagg.sumValueWithNullCheck(value, this.sumScale);
         }
       }
     }
@@ -314,7 +326,7 @@ public class VectorUDAFAvgDecimal extend
     private void iterateHasNullsSelectionWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int bufferIndex,
-      Decimal128[] values,
+      HiveDecimalWritable[] values,
       int batchSize,
       int[] selection,
       boolean[] isNull) {
@@ -326,7 +338,7 @@ public class VectorUDAFAvgDecimal extend
             aggregationBufferSets,
             bufferIndex,
             j);
-          myagg.sumValueWithCheck(values[i], this.sumScale);
+          myagg.sumValueWithNullCheck(values[i], this.sumScale);
         }
       }
    }
@@ -334,7 +346,7 @@ public class VectorUDAFAvgDecimal extend
     private void iterateHasNullsWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int bufferIndex,
-      Decimal128[] values,
+      HiveDecimalWritable[] values,
       int batchSize,
       boolean[] isNull) {
 
@@ -344,7 +356,7 @@ public class VectorUDAFAvgDecimal extend
             aggregationBufferSets,
             bufferIndex,
             i);
-          myagg.sumValueWithCheck(values[i], this.sumScale);
+          myagg.sumValueWithNullCheck(values[i], this.sumScale);
         }
       }
    }
@@ -367,18 +379,31 @@ public class VectorUDAFAvgDecimal extend
 
         Aggregation myagg = (Aggregation)agg;
 
-        Decimal128[] vector = inputVector.vector;
+        HiveDecimalWritable[] vector = inputVector.vector;
 
         if (inputVector.isRepeating) {
           if (inputVector.noNulls) {
             if (myagg.isNull) {
               myagg.isNull = false;
-              myagg.sum.zeroClear();
+              myagg.sum.set(HiveDecimal.ZERO);
               myagg.count = 0;
             }
-            scratchDecimal.update(batchSize);
-            scratchDecimal.multiplyDestructive(vector[0], vector[0].getScale());
-            myagg.sum.update(scratchDecimal);
+            HiveDecimal value = vector[0].getHiveDecimal();
+            HiveDecimal multiple;
+            try {
+              multiple = value.multiply(HiveDecimal.create(batchSize));
+            } catch (ArithmeticException e) {  // catch on overflow
+              myagg.isOutOfRange = true;
+              return;
+            }
+            HiveDecimal result;
+            try {
+              result = myagg.sum.getHiveDecimal().add(multiple);
+            } catch (ArithmeticException e) {  // catch on overflow
+              myagg.isOutOfRange = true;
+              return;
+            }
+            myagg.sum.set(result);
             myagg.count += batchSize;
           }
           return;
@@ -400,7 +425,7 @@ public class VectorUDAFAvgDecimal extend
 
     private void iterateSelectionHasNulls(
         Aggregation myagg,
-        Decimal128[] vector,
+        HiveDecimalWritable[] vector,
         int batchSize,
         boolean[] isNull,
         int[] selected) {
@@ -408,57 +433,57 @@ public class VectorUDAFAvgDecimal extend
       for (int j=0; j< batchSize; ++j) {
         int i = selected[j];
         if (!isNull[i]) {
-          Decimal128 value = vector[i];
-          myagg.sumValueWithCheck(value, this.sumScale);
+          HiveDecimalWritable value = vector[i];
+          myagg.sumValueWithNullCheck(value, this.sumScale);
         }
       }
     }
 
     private void iterateSelectionNoNulls(
         Aggregation myagg,
-        Decimal128[] vector,
+        HiveDecimalWritable[] vector,
         int batchSize,
         int[] selected) {
 
       if (myagg.isNull) {
         myagg.isNull = false;
-        myagg.sum.zeroClear();
+        myagg.sum.set(HiveDecimal.ZERO);
         myagg.count = 0;
       }
 
       for (int i=0; i< batchSize; ++i) {
-        Decimal128 value = vector[selected[i]];
-        myagg.sumValueNoCheck(value, this.sumScale);
+        HiveDecimalWritable value = vector[selected[i]];
+        myagg.sumValueNoNullCheck(value, this.sumScale);
       }
     }
 
     private void iterateNoSelectionHasNulls(
         Aggregation myagg,
-        Decimal128[] vector,
+        HiveDecimalWritable[] vector,
         int batchSize,
         boolean[] isNull) {
 
       for(int i=0;i<batchSize;++i) {
         if (!isNull[i]) {
-          Decimal128 value = vector[i];
-          myagg.sumValueWithCheck(value, this.sumScale);
+          HiveDecimalWritable value = vector[i];
+          myagg.sumValueWithNullCheck(value, this.sumScale);
         }
       }
     }
 
     private void iterateNoSelectionNoNulls(
         Aggregation myagg,
-        Decimal128[] vector,
+        HiveDecimalWritable[] vector,
         int batchSize) {
       if (myagg.isNull) {
         myagg.isNull = false;
-        myagg.sum.zeroClear();
+        myagg.sum.set(HiveDecimal.ZERO);
         myagg.count = 0;
       }
 
       for (int i=0;i<batchSize;++i) {
-        Decimal128 value = vector[i];
-        myagg.sumValueNoCheck(value, this.sumScale);
+        HiveDecimalWritable value = vector[i];
+        myagg.sumValueNoNullCheck(value, this.sumScale);
       }
     }
 
@@ -477,13 +502,13 @@ public class VectorUDAFAvgDecimal extend
     public Object evaluateOutput(
         AggregationBuffer agg) throws HiveException {
       Aggregation myagg = (Aggregation) agg;
-      if (myagg.isNull) {
+      if (myagg.isNull || myagg.isOutOfRange) {
         return null;
       }
       else {
         assert(0 < myagg.count);
         resultCount.set (myagg.count);
-        resultSum.set(HiveDecimal.create(myagg.sum.toBigDecimal()));
+        resultSum.set(myagg.sum.getHiveDecimal());
         return partialResult;
       }
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/VectorUDAFSumDecimal.java Fri Nov  7 20:41:34 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates;
 
-import org.apache.hadoop.hive.common.type.Decimal128;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
@@ -29,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 
@@ -48,15 +48,29 @@ public class VectorUDAFSumDecimal extend
 
       private static final long serialVersionUID = 1L;
 
-      transient private Decimal128 sum = new Decimal128();
+      transient private HiveDecimalWritable sum = new HiveDecimalWritable();
       transient private boolean isNull;
 
-      public void sumValue(Decimal128 value, short scale) {
+      // We use this to catch overflow.
+      transient private boolean isOutOfRange;
+
+      public void sumValue(HiveDecimalWritable writable, short scale) {
+        if (isOutOfRange) {
+          return;
+        }
+        HiveDecimal value = writable.getHiveDecimal();
         if (isNull) {
-          sum.update(value, scale);
+          sum.set(value);
           isNull = false;
         } else {
-          sum.addDestructive(value, scale);
+          HiveDecimal result;
+          try {
+            result = sum.getHiveDecimal().add(value);
+          } catch (ArithmeticException e) {  // catch on overflow
+            isOutOfRange = true;
+            return;
+          }
+          sum.set(result);
         }
       }
 
@@ -68,12 +82,13 @@ public class VectorUDAFSumDecimal extend
       @Override
       public void reset() {
         isNull = true;
-        sum.zeroClear();
+        isOutOfRange = false;
+        sum.set(HiveDecimal.ZERO);
       }
     }
 
     private VectorExpression inputExpression;
-    transient private final Decimal128 scratchDecimal;
+    transient private final HiveDecimalWritable scratchDecimal;
 
     public VectorUDAFSumDecimal(VectorExpression inputExpression) {
       this();
@@ -82,7 +97,7 @@ public class VectorUDAFSumDecimal extend
 
     public VectorUDAFSumDecimal() {
       super();
-      scratchDecimal = new Decimal128();
+      scratchDecimal = new HiveDecimalWritable();
     }
 
     private Aggregation getCurrentAggregationBuffer(
@@ -110,7 +125,7 @@ public class VectorUDAFSumDecimal extend
 
       DecimalColumnVector inputVector = (DecimalColumnVector)batch.
         cols[this.inputExpression.getOutputColumn()];
-      Decimal128[] vector = inputVector.vector;
+      HiveDecimalWritable[] vector = inputVector.vector;
 
       if (inputVector.noNulls) {
         if (inputVector.isRepeating) {
@@ -163,7 +178,7 @@ public class VectorUDAFSumDecimal extend
     private void iterateNoNullsRepeatingWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int aggregateIndex,
-      Decimal128 value,
+      HiveDecimalWritable value,
       short scale,
       int batchSize) {
 
@@ -179,7 +194,7 @@ public class VectorUDAFSumDecimal extend
     private void iterateNoNullsSelectionWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int aggregateIndex,
-      Decimal128[] values,
+      HiveDecimalWritable[] values,
       short scale,
       int[] selection,
       int batchSize) {
@@ -196,7 +211,7 @@ public class VectorUDAFSumDecimal extend
     private void iterateNoNullsWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int aggregateIndex,
-      Decimal128[] values,
+      HiveDecimalWritable[] values,
       short scale,
       int batchSize) {
       for (int i=0; i < batchSize; ++i) {
@@ -211,7 +226,7 @@ public class VectorUDAFSumDecimal extend
     private void iterateHasNullsRepeatingSelectionWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int aggregateIndex,
-      Decimal128 value,
+      HiveDecimalWritable value,
       short scale,
       int batchSize,
       int[] selection,
@@ -232,7 +247,7 @@ public class VectorUDAFSumDecimal extend
     private void iterateHasNullsRepeatingWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int aggregateIndex,
-      Decimal128 value,
+      HiveDecimalWritable value,
       short scale,
       int batchSize,
       boolean[] isNull) {
@@ -251,7 +266,7 @@ public class VectorUDAFSumDecimal extend
     private void iterateHasNullsSelectionWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int aggregateIndex,
-      Decimal128[] values,
+      HiveDecimalWritable[] values,
       short scale,
       int batchSize,
       int[] selection,
@@ -272,7 +287,7 @@ public class VectorUDAFSumDecimal extend
     private void iterateHasNullsWithAggregationSelection(
       VectorAggregationBufferRow[] aggregationBufferSets,
       int aggregateIndex,
-      Decimal128[] values,
+      HiveDecimalWritable[] values,
       short scale,
       int batchSize,
       boolean[] isNull) {
@@ -305,18 +320,34 @@ public class VectorUDAFSumDecimal extend
       }
 
       Aggregation myagg = (Aggregation)agg;
+      if (myagg.isOutOfRange) {
+        return;
+      }
 
-      Decimal128[] vector = inputVector.vector;
+      HiveDecimalWritable[] vector = inputVector.vector;
 
       if (inputVector.isRepeating) {
         if ((inputVector.noNulls) || !inputVector.isNull[0]) {
           if (myagg.isNull) {
             myagg.isNull = false;
-            myagg.sum.zeroClear();
+            myagg.sum.set(HiveDecimal.ZERO);
+          }
+          HiveDecimal value = vector[0].getHiveDecimal();
+          HiveDecimal multiple;
+          try {
+            multiple = value.multiply(HiveDecimal.create(batchSize));
+          } catch (ArithmeticException e) {  // catch on overflow
+            myagg.isOutOfRange = true;
+            return;
           }
-          scratchDecimal.update(batchSize);
-          scratchDecimal.multiplyDestructive(vector[0], inputVector.scale);
-          myagg.sum.addDestructive(scratchDecimal, inputVector.scale);
+          HiveDecimal result;
+          try {
+            result = myagg.sum.getHiveDecimal().add(multiple);
+          } catch (ArithmeticException e) {  // catch on overflow
+            myagg.isOutOfRange = true;
+            return;
+          }
+          myagg.sum.set(result);
         }
         return;
       }
@@ -337,7 +368,7 @@ public class VectorUDAFSumDecimal extend
 
     private void iterateSelectionHasNulls(
         Aggregation myagg,
-        Decimal128[] vector,
+        HiveDecimalWritable[] vector,
         short scale,
         int batchSize,
         boolean[] isNull,
@@ -346,66 +377,94 @@ public class VectorUDAFSumDecimal extend
       for (int j=0; j< batchSize; ++j) {
         int i = selected[j];
         if (!isNull[i]) {
-          Decimal128 value = vector[i];
           if (myagg.isNull) {
             myagg.isNull = false;
-            myagg.sum.zeroClear();
+            myagg.sum.set(HiveDecimal.ZERO);
+          }
+          HiveDecimal value = vector[i].getHiveDecimal();
+          HiveDecimal result;
+          try {
+            result = myagg.sum.getHiveDecimal().add(value);
+          } catch (ArithmeticException e) {  // catch on overflow
+            myagg.isOutOfRange = true;
+            return;
           }
-          myagg.sum.addDestructive(value, scale);
+          myagg.sum.set(result);
         }
       }
     }
 
     private void iterateSelectionNoNulls(
         Aggregation myagg,
-        Decimal128[] vector,
+        HiveDecimalWritable[] vector,
         short scale,
         int batchSize,
         int[] selected) {
 
       if (myagg.isNull) {
-        myagg.sum.zeroClear();
+        myagg.sum.set(HiveDecimal.ZERO);
         myagg.isNull = false;
       }
 
       for (int i=0; i< batchSize; ++i) {
-        Decimal128 value = vector[selected[i]];
-        myagg.sum.addDestructive(value, scale);
+        HiveDecimal value = vector[selected[i]].getHiveDecimal();
+        HiveDecimal result;
+        try {
+          result = myagg.sum.getHiveDecimal().add(value);
+        } catch (ArithmeticException e) {  // catch on overflow
+          myagg.isOutOfRange = true;
+          return;
+        }
+        myagg.sum.set(result);
       }
     }
 
     private void iterateNoSelectionHasNulls(
         Aggregation myagg,
-        Decimal128[] vector,
+        HiveDecimalWritable[] vector,
         short scale,
         int batchSize,
         boolean[] isNull) {
 
       for(int i=0;i<batchSize;++i) {
         if (!isNull[i]) {
-          Decimal128 value = vector[i];
           if (myagg.isNull) {
-            myagg.sum.zeroClear();
+            myagg.sum.set(HiveDecimal.ZERO);
             myagg.isNull = false;
           }
-          myagg.sum.addDestructive(value, scale);
+          HiveDecimal value = vector[i].getHiveDecimal();
+          HiveDecimal result;
+          try {
+            result = myagg.sum.getHiveDecimal().add(value);
+          } catch (ArithmeticException e) {  // catch on overflow
+            myagg.isOutOfRange = true;
+            return;
+          }
+          myagg.sum.set(result);
         }
       }
     }
 
     private void iterateNoSelectionNoNulls(
         Aggregation myagg,
-        Decimal128[] vector,
+        HiveDecimalWritable[] vector,
         short scale,
         int batchSize) {
       if (myagg.isNull) {
-        myagg.sum.zeroClear();
+        myagg.sum.set(HiveDecimal.ZERO);
         myagg.isNull = false;
       }
 
       for (int i=0;i<batchSize;++i) {
-        Decimal128 value = vector[i];
-        myagg.sum.addDestructive(value, scale);
+        HiveDecimal value = vector[i].getHiveDecimal();
+        HiveDecimal result;
+        try {
+          result = myagg.sum.getHiveDecimal().add(value);
+        } catch (ArithmeticException e) {  // catch on overflow
+          myagg.isOutOfRange = true;
+          return;
+        }
+        myagg.sum.set(result);
       }
     }
 
@@ -423,11 +482,11 @@ public class VectorUDAFSumDecimal extend
     @Override
     public Object evaluateOutput(AggregationBuffer agg) throws HiveException {
       Aggregation myagg = (Aggregation) agg;
-      if (myagg.isNull) {
+      if (myagg.isNull || myagg.isOutOfRange) {
         return null;
       }
       else {
-        return HiveDecimal.create(myagg.sum.toBigDecimal());
+        return myagg.sum.getHiveDecimal();
       }
     }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/udf/VectorUDFAdaptor.java Fri Nov  7 20:41:34 2014
@@ -329,10 +329,10 @@ public class VectorUDFAdaptor extends Ve
     } else if (outputOI instanceof WritableHiveDecimalObjectInspector) {
       DecimalColumnVector dcv = (DecimalColumnVector) colVec;
       if (value instanceof HiveDecimal) {
-        dcv.vector[i].update(((HiveDecimal) value).bigDecimalValue());
+        dcv.set(i, (HiveDecimal) value);
       } else {
         HiveDecimal hd = ((WritableHiveDecimalObjectInspector) outputOI).getPrimitiveJavaObject(value);
-        dcv.vector[i].update(hd.bigDecimalValue());
+        dcv.set(i, hd);
       }
     } else {
       throw new RuntimeException("Unhandled object type " + outputOI.getTypeName());

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Fri Nov  7 20:41:34 2014
@@ -74,7 +74,12 @@ public final class HiveFileFormatUtils {
         SequenceFileOutputFormat.class, HiveSequenceFileOutputFormat.class);
   }
 
-  static String realoutputFormat;
+  private static ThreadLocal<String> tRealOutputFormat = new ThreadLocal<String>() {
+    @Override
+    protected String initialValue() {
+      return null;
+    }
+  };
 
   @SuppressWarnings("unchecked")
   private static Map<Class<? extends OutputFormat>, Class<? extends HiveOutputFormat>>
@@ -105,11 +110,9 @@ public final class HiveFileFormatUtils {
     }
     Class<? extends HiveOutputFormat> result = outputFormatSubstituteMap
         .get(origin);
-    //register this output format into the map for the first time
-    if ((storagehandlerflag == true) && (result == null)) {
+    if ((storagehandlerflag == true) && (result == null || result == HivePassThroughOutputFormat.class)) {
       HiveFileFormatUtils.setRealOutputFormatClassName(origin.getName());
       result = HivePassThroughOutputFormat.class;
-      HiveFileFormatUtils.registerOutputFormatSubstitute((Class<? extends OutputFormat>) origin,HivePassThroughOutputFormat.class);
     }
     return result;
   }
@@ -120,7 +123,7 @@ public final class HiveFileFormatUtils {
   @SuppressWarnings("unchecked")
   public static String getRealOutputFormatClassName()
   {
-    return realoutputFormat;
+    return tRealOutputFormat.get();
   }
 
   /**
@@ -129,7 +132,7 @@ public final class HiveFileFormatUtils {
   public static void setRealOutputFormatClassName(
       String destination) {
     if (destination != null){
-      realoutputFormat = destination;
+      tRealOutputFormat.set(destination);
     }
     else {
       return;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java Fri Nov  7 20:41:34 2014
@@ -30,14 +30,17 @@ import org.apache.avro.file.CodecFactory
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
@@ -47,7 +50,9 @@ import org.apache.hadoop.util.Progressab
  * Write to an Avro file from a Hive process.
  */
 public class AvroContainerOutputFormat
-        implements HiveOutputFormat<LongWritable, AvroGenericRecordWritable> {
+        implements HiveOutputFormat<WritableComparable, AvroGenericRecordWritable> {
+
+  public static final Log LOG = LogFactory.getLog(AvroContainerOutputFormat.class);
 
   @Override
   public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
@@ -75,21 +80,62 @@ public class AvroContainerOutputFormat
     return new AvroGenericRecordWriter(dfw);
   }
 
-    //no records will be emitted from Hive
-  @Override
-  public RecordWriter<LongWritable, AvroGenericRecordWritable>
-  getRecordWriter(FileSystem ignored, JobConf job, String name,
-      Progressable progress) {
-    return new RecordWriter<LongWritable, AvroGenericRecordWritable>() {
-      @Override
-      public void write(LongWritable key, AvroGenericRecordWritable value) {
-        throw new RuntimeException("Should not be called");
-      }
+  class WrapperRecordWriter<K extends Writable,V extends Writable> implements RecordWriter<K, V> {
+    FileSinkOperator.RecordWriter hiveWriter = null;
+    JobConf jobConf;
+    Progressable progressable;
+    String fileName;
+
+    public WrapperRecordWriter(JobConf jobConf, Progressable progressable, String fileName){
+      this.progressable = progressable;
+      this.jobConf = jobConf;
+      this.fileName = fileName;
+    }
+
+    private FileSinkOperator.RecordWriter getHiveWriter() throws IOException {
+      if (this.hiveWriter == null){
+        Properties properties = new Properties();
+        for (AvroSerdeUtils.AvroTableProperties tableProperty : AvroSerdeUtils.AvroTableProperties.values()){
+          String propVal;
+          if((propVal = jobConf.get(tableProperty.getPropName())) != null){
+            properties.put(tableProperty.getPropName(),propVal);
+          }
+        }
+
+        Boolean isCompressed = jobConf.getBoolean("mapreduce.output.fileoutputformat.compress", false);
+        Path path = new Path(this.fileName);
+        if(path.getFileSystem(jobConf).isDirectory(path)){
+          // This path is only potentially encountered during setup
+          // Otherwise, a specific part_xxxx file name is generated and passed in.
+          path = new Path(path,"_dummy");
+        }
 
-      @Override
-      public void close(Reporter reporter) {
+        this.hiveWriter = getHiveRecordWriter(jobConf,path,null,isCompressed, properties, progressable);
       }
-    };
+      return this.hiveWriter;
+    }
+
+    @Override
+    public void write(K key, V value) throws IOException {
+      getHiveWriter().write(value);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      // Normally, I'd worry about the blanket false being passed in here, and that
+      // it'd need to be integrated into an abort call for an OutputCommitter, but the
+      // underlying recordwriter ignores it and throws it away, so it's irrelevant.
+      getHiveWriter().close(false);
+    }
+
+  }
+
+    //no records will be emitted from Hive
+  @Override
+  public RecordWriter<WritableComparable, AvroGenericRecordWritable>
+  getRecordWriter(FileSystem ignored, JobConf job, String fileName,
+      Progressable progress) throws IOException {
+    return new WrapperRecordWriter<WritableComparable, AvroGenericRecordWritable>(job,progress,fileName);
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Fri Nov  7 20:41:34 2014
@@ -1259,12 +1259,9 @@ class RecordReaderImpl implements Record
         if (!result.isNull[0]) {
           BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
           short scaleInData = (short) scaleStream.next();
-          result.vector[0].update(bInt, scaleInData);
-
-          // Change the scale to match the schema if the scale in data is different.
-          if (scale != scaleInData) {
-            result.vector[0].changeScaleDestructive((short) scale);
-          }
+          HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
+          dec = HiveDecimalUtils.enforcePrecisionScale(dec, precision, scale);
+          result.set(0, dec);
         }
       } else {
         // result vector has isNull values set, use the same to read scale vector.
@@ -1273,13 +1270,10 @@ class RecordReaderImpl implements Record
         for (int i = 0; i < batchSize; i++) {
           if (!result.isNull[i]) {
             BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
-            result.vector[i].update(bInt, (short) scratchScaleVector.vector[i]);
-
-            // Change the scale to match the schema if the scale is less than in data.
-            // (HIVE-7373) If scale is bigger, then it leaves the original trailing zeros
-            if (scale < scratchScaleVector.vector[i]) {
-              result.vector[i].changeScaleDestructive((short) scale);
-            }
+            short scaleInData = (short) scratchScaleVector.vector[i];
+            HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
+            dec = HiveDecimalUtils.enforcePrecisionScale(dec, precision, scale);
+            result.set(i, dec);
           }
         }
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java Fri Nov  7 20:41:34 2014
@@ -18,14 +18,14 @@
 
 package org.apache.hadoop.hive.ql.log;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.session.SessionState;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * PerfLogger.
  *
@@ -147,10 +147,37 @@ public class PerfLogger {
   }
 
   public Long getStartTime(String method) {
-    return startTimes.get(method);
+    long startTime = 0L;
+
+    if (startTimes.containsKey(method)) {
+      startTime = startTimes.get(method);
+    }
+    return startTime;
   }
 
   public Long getEndTime(String method) {
-    return endTimes.get(method);
+    long endTime = 0L;
+
+    if (endTimes.containsKey(method)) {
+      endTime = endTimes.get(method);
+    }
+    return endTime;
   }
+
+  public boolean startTimeHasMethod(String method) {
+    return startTimes.containsKey(method);
+  }
+
+  public boolean endTimeHasMethod(String method) {
+    return endTimes.containsKey(method);
+  }
+
+  public Long getDuration(String method) {
+    long duration = 0;
+    if (startTimes.containsKey(method) && endTimes.containsKey(method)) {
+      duration = endTimes.get(method) - startTimes.get(method);
+    }
+    return duration;
+  }
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Fri Nov  7 20:41:34 2014
@@ -18,16 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
@@ -39,7 +29,6 @@ import org.apache.hadoop.hive.ql.exec.Jo
 import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
 import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
 import org.apache.hadoop.hive.ql.exec.LimitOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.PTFOperator;
@@ -76,6 +65,16 @@ import org.apache.hadoop.hive.ql.plan.pt
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
 /**
  * Factory for generating the different node processors used by ColumnPruner.
  */
@@ -600,8 +599,7 @@ public final class ColumnPrunerProcFacto
           // revert output cols of SEL(*) to ExprNodeColumnDesc
           String[] tabcol = rr.reverseLookup(col);
           ColumnInfo colInfo = rr.get(tabcol[0], tabcol[1]);
-          ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo.getType(),
-              colInfo.getInternalName(), colInfo.getTabAlias(), colInfo.getIsVirtualCol());
+          ExprNodeColumnDesc colExpr = new ExprNodeColumnDesc(colInfo);
           colList.add(colExpr);
           outputColNames.add(col);
         }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SetReducerParallelism.java Fri Nov  7 20:41:34 2014
@@ -31,11 +31,13 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.stats.annotation.StatsRulesProcFactory;
 import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
 
 import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
 import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
@@ -82,7 +84,8 @@ public class SetReducerParallelism imple
         for (Operator<? extends OperatorDesc> sibling:
           sink.getChildOperators().get(0).getParentOperators()) {
           if (sibling.getStatistics() != null) {
-            numberOfBytes += sibling.getStatistics().getDataSize();
+            numberOfBytes = StatsUtils.safeAdd(
+                numberOfBytes, sibling.getStatistics().getDataSize());
           } else {
             LOG.warn("No stats available from: "+sibling);
           }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java Fri Nov  7 20:41:34 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.Rul
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -164,12 +165,23 @@ public class SkewJoinOptimizer implement
         return null;
       }
 
+      // have to create a QBJoinTree for the cloned join operator
+      QBJoinTree originJoinTree = parseContext.getJoinContext().get(joinOp);
+      QBJoinTree newJoinTree;
+      try {
+        newJoinTree = originJoinTree.clone();
+      } catch (CloneNotSupportedException e) {
+        LOG.debug("QBJoinTree could not be cloned: ", e);
+        return null;
+      }
+
       JoinOperator joinOpClone;
       if (processSelect) {
         joinOpClone = (JoinOperator)(currOpClone.getParentOperators().get(0));
       } else {
         joinOpClone = (JoinOperator)currOpClone;
       }
+      parseContext.getJoinContext().put(joinOpClone, newJoinTree);
 
       List<TableScanOperator> tableScanCloneOpsForJoin =
           new ArrayList<TableScanOperator>();
@@ -201,6 +213,7 @@ public class SkewJoinOptimizer implement
         }
 
         parseContext.getTopOps().put(newAlias, tso);
+        setUpAlias(originJoinTree, newJoinTree, tabAlias, newAlias, tso);
       }
 
       // Now do a union of the select operators: selectOp and selectOpClone
@@ -610,6 +623,48 @@ public class SkewJoinOptimizer implement
         }
       }
     }
+
+    /**
+     * Set alias in the cloned join tree
+     */
+    private static void setUpAlias(QBJoinTree origin, QBJoinTree cloned, String origAlias,
+        String newAlias, Operator<? extends OperatorDesc> topOp) {
+      cloned.getAliasToOpInfo().remove(origAlias);
+      cloned.getAliasToOpInfo().put(newAlias, topOp);
+      if (origin.getLeftAlias().equals(origAlias)) {
+        cloned.setLeftAlias(null);
+        cloned.setLeftAlias(newAlias);
+      }
+      replaceAlias(origin.getLeftAliases(), cloned.getLeftAliases(), origAlias, newAlias);
+      replaceAlias(origin.getRightAliases(), cloned.getRightAliases(), origAlias, newAlias);
+      replaceAlias(origin.getBaseSrc(), cloned.getBaseSrc(), origAlias, newAlias);
+      replaceAlias(origin.getMapAliases(), cloned.getMapAliases(), origAlias, newAlias);
+      replaceAlias(origin.getStreamAliases(), cloned.getStreamAliases(), origAlias, newAlias);
+    }
+
+    private static void replaceAlias(String[] origin, String[] cloned,
+        String alias, String newAlias) {
+      if (origin == null || cloned == null || origin.length != cloned.length) {
+        return;
+      }
+      for (int i = 0; i < origin.length; i++) {
+        if (origin[i].equals(alias)) {
+          cloned[i] = newAlias;
+        }
+      }
+    }
+
+    private static void replaceAlias(List<String> origin, List<String> cloned,
+        String alias, String newAlias) {
+      if (origin == null || cloned == null || origin.size() != cloned.size()) {
+        return;
+      }
+      for (int i = 0; i < origin.size(); i++) {
+        if (origin.get(i).equals(alias)) {
+          cloned.set(i, newAlias);
+        }
+      }
+    }
   }
 
   /* (non-Javadoc)

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Fri Nov  7 20:41:34 2014
@@ -18,14 +18,8 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -72,8 +66,14 @@ import org.apache.hadoop.hive.ql.plan.Re
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
 
 /**
  * When dynamic partitioning (with or without bucketing and sorting) is enabled, this optimization
@@ -157,7 +157,11 @@ public class SortedDynPartitionOptimizer
       // the reduce sink key. Since both key columns are not prefix subset
       // ReduceSinkDeDuplication will not merge them together resulting in 2 MR jobs.
       // To avoid that we will remove the RS (and EX) inserted by enforce bucketing/sorting.
-      removeRSInsertedByEnforceBucketing(fsOp);
+      if (!removeRSInsertedByEnforceBucketing(fsOp)) {
+        LOG.debug("Bailing out of sort dynamic partition optimization as some partition columns " +
+            "got constant folded.");
+        return null;
+      }
 
       // unlink connection between FS and its parent
       Operator<? extends OperatorDesc> fsParent = fsOp.getParentOperators().get(0);
@@ -209,8 +213,7 @@ public class SortedDynPartitionOptimizer
       ArrayList<ExprNodeDesc> newValueCols = Lists.newArrayList();
       Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap();
       for (ColumnInfo ci : valColInfo) {
-        newValueCols.add(new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(), ci
-            .getTabAlias(), ci.isHiddenVirtualCol()));
+        newValueCols.add(new ExprNodeColumnDesc(ci));
         colExprMap.put(ci.getInternalName(), newValueCols.get(newValueCols.size() - 1));
       }
       ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder,
@@ -263,7 +266,7 @@ public class SortedDynPartitionOptimizer
 
     // Remove RS and EX introduced by enforce bucketing/sorting config
     // Convert PARENT -> RS -> EX -> FS to PARENT -> FS
-    private void removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) {
+    private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) {
       HiveConf hconf = parseCtx.getConf();
       boolean enforceBucketing = HiveConf.getBoolVar(hconf, ConfVars.HIVEENFORCEBUCKETING);
       boolean enforceSorting = HiveConf.getBoolVar(hconf, ConfVars.HIVEENFORCESORTING);
@@ -298,17 +301,27 @@ public class SortedDynPartitionOptimizer
           Operator<? extends OperatorDesc> rsGrandChild = rsChild.getChildOperators().get(0);
 
           if (rsChild instanceof ExtractOperator) {
+            // if schema size cannot be matched, then it could be because of constant folding
+            // converting partition column expression to constant expression. The constant
+            // expression will then get pruned by column pruner since it will not reference to
+            // any columns.
+            if (rsParent.getSchema().getSignature().size() !=
+                rsChild.getSchema().getSignature().size()) {
+              return false;
+            }
             rsParent.getChildOperators().clear();
             rsParent.getChildOperators().add(rsGrandChild);
             rsGrandChild.getParentOperators().clear();
             rsGrandChild.getParentOperators().add(rsParent);
             parseCtx.removeOpParseCtx(rsToRemove);
             parseCtx.removeOpParseCtx(rsChild);
-            LOG.info("Removed " + rsParent.getOperatorId() + " and " + rsChild.getOperatorId()
+            LOG.info("Removed " + rsToRemove.getOperatorId() + " and " + rsChild.getOperatorId()
                 + " as it was introduced by enforce bucketing/sorting.");
           }
         }
       }
+
+      return true;
     }
 
     private List<Integer> getPartitionPositions(DynamicPartitionCtx dpCtx, RowSchema schema) {
@@ -476,8 +489,7 @@ public class SortedDynPartitionOptimizer
 
       for (Integer idx : pos) {
         ColumnInfo ci = colInfos.get(idx);
-        ExprNodeColumnDesc encd = new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(),
-            ci.getTabAlias(), ci.isHiddenVirtualCol());
+        ExprNodeColumnDesc encd = new ExprNodeColumnDesc(ci);
         cols.add(encd);
       }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/HiveOptiqUtil.java Fri Nov  7 20:41:34 2014
@@ -75,7 +75,7 @@ public class HiveOptiqUtil {
     return vCols;
   }
 
-  public static boolean validateASTForCBO(ASTNode ast) {
+  public static boolean validateASTForUnsupportedTokens(ASTNode ast) {
     String astTree = ast.toStringTree();
     // if any of following tokens are present in AST, bail out
     String[] tokens = { "TOK_CHARSETLITERAL","TOK_TABLESPLITSAMPLE" };

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/rules/PartitionPruner.java Fri Nov  7 20:41:34 2014
@@ -108,7 +108,7 @@ public class PartitionPruner {
       boolean argsPruned = false;
 
       GenericUDF hiveUDF = SqlFunctionConverter.getHiveUDF(call.getOperator(),
-          call.getType());
+          call.getType(), call.operands.size());
       if (hiveUDF != null &&
           !FunctionRegistry.isDeterministic(hiveUDF)) {
         return null;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/ExprNodeConverter.java Fri Nov  7 20:41:34 2014
@@ -89,17 +89,17 @@ public class ExprNodeConverter extends R
       ArrayList<ExprNodeDesc> tmpExprArgs = new ArrayList<ExprNodeDesc>();
       tmpExprArgs.addAll(args.subList(0, 2));
       gfDesc = new ExprNodeGenericFuncDesc(TypeConverter.convert(call.getType()),
-          SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType()), tmpExprArgs);
+          SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType(), 2), tmpExprArgs);
       for (int i = 2; i < call.operands.size(); i++) {
         tmpExprArgs = new ArrayList<ExprNodeDesc>();
         tmpExprArgs.add(gfDesc);
         tmpExprArgs.add(args.get(i));
         gfDesc = new ExprNodeGenericFuncDesc(TypeConverter.convert(call.getType()),
-            SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType()), tmpExprArgs);
+            SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType(), 2), tmpExprArgs);
       }
     } else {
       gfDesc = new ExprNodeGenericFuncDesc(TypeConverter.convert(call.getType()),
-          SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType()), args);
+          SqlFunctionConverter.getHiveUDF(call.getOperator(), call.getType(), args.size()), args);
     }
 
     return gfDesc;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/JoinCondTypeCheckProcFactory.java Fri Nov  7 20:41:34 2014
@@ -17,15 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.optimizer.optiq.translator;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo;
@@ -47,6 +38,15 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
 /**
  * JoinCondTypeCheckProcFactory is used by Optiq planner(CBO) to generate Join Conditions from Join Condition AST.
  * Reasons for sub class:
@@ -99,8 +99,7 @@ public class JoinCondTypeCheckProcFactor
       if (!qualifiedAccess) {
         colInfo = getColInfo(ctx, null, tableOrCol, expr);
         // It's a column.
-        return new ExprNodeColumnDesc(colInfo.getType(), colInfo.getInternalName(),
-            colInfo.getTabAlias(), colInfo.getIsVirtualCol());
+        return new ExprNodeColumnDesc(colInfo);
       } else if (hasTableAlias(ctx, tableOrCol, expr)) {
         return null;
       } else {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/SqlFunctionConverter.java Fri Nov  7 20:41:34 2014
@@ -98,10 +98,19 @@ public class SqlFunctionConverter {
     return getOptiqFn(name, optiqArgTypes, retType);
   }
 
-  public static GenericUDF getHiveUDF(SqlOperator op, RelDataType dt) {
+  public static GenericUDF getHiveUDF(SqlOperator op, RelDataType dt, int argsLength) {
     String name = reverseOperatorMap.get(op);
-    if (name == null)
+    if (name == null) {
       name = op.getName();
+    }
+    // Make sure we handle unary + and - correctly.
+    if (argsLength == 1) {
+      if (name == "+") {
+        name = FunctionRegistry.UNARY_PLUS_FUNC_NAME;
+      } else if (name == "-") {
+        name = FunctionRegistry.UNARY_MINUS_FUNC_NAME;
+      }
+    }
     FunctionInfo hFn = name != null ? FunctionRegistry.getFunctionInfo(name) : null;
     if (hFn == null)
       hFn = handleExplicitCast(op, dt);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/optiq/translator/TypeConverter.java Fri Nov  7 20:41:34 2014
@@ -187,7 +187,7 @@ public class TypeConverter {
       throw new RuntimeException("Unsupported Type : " + type.getTypeName());
     }
 
-    return convertedType;
+    return dtFactory.createTypeWithNullability(convertedType, true);
   }
 
   public static RelDataType convert(ListTypeInfo lstType,

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Fri Nov  7 20:41:34 2014
@@ -18,13 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -60,6 +53,13 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * GenMRSkewJoinProcessor.
  *
@@ -192,9 +192,7 @@ public final class GenMRSkewJoinProcesso
         String newColName = i + "_VALUE_" + k; // any name, it does not matter.
         ColumnInfo columnInfo = new ColumnInfo(newColName, type, alias.toString(), false);
         columnInfos.add(columnInfo);
-        newValueExpr.add(new ExprNodeColumnDesc(
-            columnInfo.getType(), columnInfo.getInternalName(),
-            columnInfo.getTabAlias(), false));
+        newValueExpr.add(new ExprNodeColumnDesc(columnInfo));
         if (!first) {
           colNames = colNames + ",";
           colTypes = colTypes + ",";
@@ -216,9 +214,7 @@ public final class GenMRSkewJoinProcesso
         ColumnInfo columnInfo = new ColumnInfo(joinKeys.get(k), TypeInfoFactory
             .getPrimitiveTypeInfo(joinKeyTypes.get(k)), alias.toString(), false);
         columnInfos.add(columnInfo);
-        newKeyExpr.add(new ExprNodeColumnDesc(
-            columnInfo.getType(), columnInfo.getInternalName(),
-            columnInfo.getTabAlias(), false));
+        newKeyExpr.add(new ExprNodeColumnDesc(columnInfo));
       }
 
       newJoinValues.put(alias, newValueExpr);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Fri Nov  7 20:41:34 2014
@@ -18,13 +18,8 @@
 
 package org.apache.hadoop.hive.ql.optimizer.stats.annotation;
 
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Re
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -77,8 +73,13 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
 
 public class StatsRulesProcFactory {
 
@@ -170,7 +171,7 @@ public class StatsRulesProcFactory {
           // in case of select(*) the data size does not change
           if (!sop.getConf().isSelectStar() && !sop.getConf().isSelStarNoCompute()) {
             long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats);
-            stats.setDataSize(setMaxIfInvalid(dataSize));
+            stats.setDataSize(dataSize);
           }
           sop.setStatistics(stats);
 
@@ -322,8 +323,8 @@ public class StatsRulesProcFactory {
         } else if (udf instanceof GenericUDFOPOr) {
           // for OR condition independently compute and update stats
           for (ExprNodeDesc child : genFunc.getChildren()) {
-            newNumRows += evaluateChildExpr(stats, child, aspCtx, neededCols,
-                fop);
+            newNumRows = StatsUtils.safeAdd(
+                evaluateChildExpr(stats, child, aspCtx, neededCols, fop), newNumRows);
           }
         } else if (udf instanceof GenericUDFOPNot) {
           newNumRows = evaluateNotExpr(stats, pred, aspCtx, neededCols, fop);
@@ -677,9 +678,9 @@ public class StatsRulesProcFactory {
             if (cs != null) {
               long ndv = cs.getCountDistint();
               if (cs.getNumNulls() > 0) {
-                ndv += 1;
+                ndv = StatsUtils.safeAdd(ndv, 1);
               }
-              ndvProduct *= ndv;
+              ndvProduct = StatsUtils.safeMult(ndvProduct, ndv);
             } else {
               if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) {
                 // the column must be an aggregate column inserted by GBY. We
@@ -714,15 +715,16 @@ public class StatsRulesProcFactory {
             if (mapSideHashAgg) {
               if (containsGroupingSet) {
                 // Case 4: column stats, hash aggregation, grouping sets
-                cardinality = Math.min((parentNumRows * sizeOfGroupingSet) / 2,
-                    ndvProduct * parallelism * sizeOfGroupingSet);
+                cardinality = Math.min(
+                    (StatsUtils.safeMult(parentNumRows, sizeOfGroupingSet)) / 2,
+                    StatsUtils.safeMult(StatsUtils.safeMult(ndvProduct, parallelism), sizeOfGroupingSet));
 
                 if (isDebugEnabled) {
                   LOG.debug("[Case 4] STATS-" + gop.toString() + ": cardinality: " + cardinality);
                 }
               } else {
                 // Case 3: column stats, hash aggregation, NO grouping sets
-                cardinality = Math.min(parentNumRows / 2, ndvProduct * parallelism);
+                cardinality = Math.min(parentNumRows / 2, StatsUtils.safeMult(ndvProduct, parallelism));
 
                 if (isDebugEnabled) {
                   LOG.debug("[Case 3] STATS-" + gop.toString() + ": cardinality: " + cardinality);
@@ -731,7 +733,7 @@ public class StatsRulesProcFactory {
             } else {
               if (containsGroupingSet) {
                 // Case 6: column stats, NO hash aggregation, grouping sets
-                cardinality = parentNumRows * sizeOfGroupingSet;
+                cardinality = StatsUtils.safeMult(parentNumRows, sizeOfGroupingSet);
 
                 if (isDebugEnabled) {
                   LOG.debug("[Case 6] STATS-" + gop.toString() + ": cardinality: " + cardinality);
@@ -758,7 +760,7 @@ public class StatsRulesProcFactory {
 
             if (containsGroupingSet) {
               // Case 8: column stats, grouping sets
-              cardinality = Math.min(parentNumRows, ndvProduct * sizeOfGroupingSet);
+              cardinality = Math.min(parentNumRows, StatsUtils.safeMult(ndvProduct, sizeOfGroupingSet));
 
               if (isDebugEnabled) {
                 LOG.debug("[Case 8] STATS-" + gop.toString() + ": cardinality: " + cardinality);
@@ -789,7 +791,7 @@ public class StatsRulesProcFactory {
 
               if (containsGroupingSet) {
                 // Case 2: NO column stats, NO hash aggregation, grouping sets
-                cardinality = parentNumRows * sizeOfGroupingSet;
+                cardinality = StatsUtils.safeMult(parentNumRows, sizeOfGroupingSet);
 
                 if (isDebugEnabled) {
                   LOG.debug("[Case 2] STATS-" + gop.toString() + ": cardinality: " + cardinality);
@@ -828,7 +830,6 @@ public class StatsRulesProcFactory {
             // for those newly added columns
             if (!colExprMap.containsKey(ci.getInternalName())) {
               String colName = ci.getInternalName();
-              colName = StatsUtils.stripPrefixFromColumnName(colName);
               String tabAlias = ci.getTabAlias();
               String colType = ci.getTypeName();
               ColStatistics cs = new ColStatistics(tabAlias, colName, colType);
@@ -902,7 +903,7 @@ public class StatsRulesProcFactory {
         long avgKeySize = 0;
         for (ColStatistics cs : colStats) {
           if (cs != null) {
-            numEstimatedRows *= cs.getCountDistint();
+            numEstimatedRows = StatsUtils.safeMult(numEstimatedRows, cs.getCountDistint());
             avgKeySize += Math.ceil(cs.getAvgColLen());
           }
         }
@@ -956,7 +957,7 @@ public class StatsRulesProcFactory {
         long hashEntrySize = gop.javaHashEntryOverHead + avgKeySize + avgValSize;
 
         // estimated hash table size
-        long estHashTableSize = numEstimatedRows * hashEntrySize;
+        long estHashTableSize = StatsUtils.safeMult(numEstimatedRows, hashEntrySize);
 
         if (estHashTableSize < maxMemHashAgg) {
           return true;
@@ -1065,7 +1066,7 @@ public class StatsRulesProcFactory {
 
           // detect if there are multiple attributes in join key
           ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0);
-          List<ExprNodeDesc> keyExprs = rsOp.getConf().getKeyCols();
+          List<String> keyExprs = rsOp.getConf().getOutputKeyColumnNames();
           numAttr = keyExprs.size();
 
           // infer PK-FK relationship in single attribute join case
@@ -1077,7 +1078,7 @@ public class StatsRulesProcFactory {
             ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos);
 
             Statistics parentStats = parent.getStatistics();
-            keyExprs = parent.getConf().getKeyCols();
+            keyExprs = parent.getConf().getOutputKeyColumnNames();
 
             // Parent RS may have column statistics from multiple parents.
             // Populate table alias to row count map, this will be used later to
@@ -1096,8 +1097,8 @@ public class StatsRulesProcFactory {
             // used to quickly look-up for column statistics of join key.
             // TODO: expressions in join condition will be ignored. assign
             // internal name for expressions and estimate column statistics for expression.
-            List<String> fqCols =
-                StatsUtils.getFullQualifedColNameFromExprs(keyExprs, parent.getColumnExprMap());
+            List<String> fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keyExprs,
+                parent.getColumnExprMap());
             joinKeys.put(pos, fqCols);
 
             // get column statistics for all output columns
@@ -1119,7 +1120,6 @@ public class StatsRulesProcFactory {
             for (int idx = 0; idx < numAttr; idx++) {
               for (Integer i : joinKeys.keySet()) {
                 String col = joinKeys.get(i).get(idx);
-                col = StatsUtils.stripPrefixFromColumnName(col);
                 ColStatistics cs = joinedColStats.get(col);
                 if (cs != null) {
                   perAttrDVs.add(cs.getCountDistint());
@@ -1136,13 +1136,12 @@ public class StatsRulesProcFactory {
               denom = getEasedOutDenominator(distinctVals);
             } else {
               for (Long l : distinctVals) {
-                denom *= l;
+                denom = StatsUtils.safeMult(denom, l);
               }
             }
           } else {
             for (List<String> jkeys : joinKeys.values()) {
               for (String jk : jkeys) {
-                jk = StatsUtils.stripPrefixFromColumnName(jk);
                 ColStatistics cs = joinedColStats.get(jk);
                 if (cs != null) {
                   distinctVals.add(cs.getCountDistint());
@@ -1166,7 +1165,6 @@ public class StatsRulesProcFactory {
             ExprNodeDesc end = colExprMap.get(key);
             if (end instanceof ExprNodeColumnDesc) {
               String colName = ((ExprNodeColumnDesc) end).getColumn();
-              colName = StatsUtils.stripPrefixFromColumnName(colName);
               String tabAlias = ((ExprNodeColumnDesc) end).getTabAlias();
               String fqColName = StatsUtils.getFullyQualifiedColumnName(tabAlias, colName);
               ColStatistics cs = joinedColStats.get(fqColName);
@@ -1214,13 +1212,13 @@ public class StatsRulesProcFactory {
           }
 
           long maxDataSize = parentSizes.get(maxRowIdx);
-          long newNumRows = (long) (joinFactor * maxRowCount * (numParents - 1));
-          long newDataSize = (long) (joinFactor * maxDataSize * (numParents - 1));
+          long newNumRows = StatsUtils.safeMult(StatsUtils.safeMult(maxRowCount, (numParents - 1)), joinFactor);
+          long newDataSize = StatsUtils.safeMult(StatsUtils.safeMult(maxDataSize, (numParents - 1)), joinFactor);
           Statistics wcStats = new Statistics();
-          wcStats.setNumRows(setMaxIfInvalid(newNumRows));
-          wcStats.setDataSize(setMaxIfInvalid(newDataSize));
+          wcStats.setNumRows(newNumRows);
+          wcStats.setDataSize(newDataSize);
           jop.setStatistics(wcStats);
-
+ 
           if (isDebugEnabled) {
             LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
           }
@@ -1339,6 +1337,7 @@ public class StatsRulesProcFactory {
         }
       }
 
+      // No need for overflow checks, assume selectivity is always <= 1.0
       float selMultiParent = 1.0f;
       for(Operator<? extends OperatorDesc> parent : multiParentOp.getParentOperators()) {
         // In the above example, TS-1 -> RS-1 and TS-2 -> RS-2 are simple trees
@@ -1369,8 +1368,8 @@ public class StatsRulesProcFactory {
         Operator<? extends OperatorDesc> op = ops.get(i);
         if (op != null && op instanceof ReduceSinkOperator) {
           ReduceSinkOperator rsOp = (ReduceSinkOperator) op;
-          List<ExprNodeDesc> keys = rsOp.getConf().getKeyCols();
-          List<String> fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys,
+          List<String> keys = rsOp.getConf().getOutputKeyColumnNames();
+          List<String> fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keys,
               rsOp.getColumnExprMap());
           if (fqCols.size() == 1) {
             String joinCol = fqCols.get(0);
@@ -1400,8 +1399,8 @@ public class StatsRulesProcFactory {
           Operator<? extends OperatorDesc> op = ops.get(i);
           if (op instanceof ReduceSinkOperator) {
             ReduceSinkOperator rsOp = (ReduceSinkOperator) op;
-            List<ExprNodeDesc> keys = rsOp.getConf().getKeyCols();
-            List<String> fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys,
+            List<String> keys = rsOp.getConf().getOutputKeyColumnNames();
+            List<String> fqCols = StatsUtils.getFullyQualifedReducerKeyNames(keys,
                 rsOp.getColumnExprMap());
             if (fqCols.size() == 1) {
               String joinCol = fqCols.get(0);
@@ -1441,7 +1440,7 @@ public class StatsRulesProcFactory {
         LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows."
           + newNumRows + " rows will be set to Long.MAX_VALUE");
       }
-      newNumRows = setMaxIfInvalid(newNumRows);
+      newNumRows = StatsUtils.getMaxIfOverflow(newNumRows);
       stats.setNumRows(newNumRows);
 
       // scale down/up the column statistics based on the changes in number of
@@ -1472,7 +1471,7 @@ public class StatsRulesProcFactory {
       stats.setColumnStats(colStats);
       long newDataSize = StatsUtils
           .getDataSizeFromColumnStats(newNumRows, colStats);
-      stats.setDataSize(setMaxIfInvalid(newDataSize));
+      stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize));
     }
 
     private long computeNewRowCount(List<Long> rowCountParents, long denom) {
@@ -1494,7 +1493,7 @@ public class StatsRulesProcFactory {
 
       for (int i = 0; i < rowCountParents.size(); i++) {
         if (i != maxIdx) {
-          result *= rowCountParents.get(i);
+          result = StatsUtils.safeMult(result, rowCountParents.get(i));
         }
       }
 
@@ -1512,7 +1511,6 @@ public class StatsRulesProcFactory {
         // find min NDV for joining columns
         for (Map.Entry<Integer, List<String>> entry : joinKeys.entrySet()) {
           String key = entry.getValue().get(joinColIdx);
-          key = StatsUtils.stripPrefixFromColumnName(key);
           ColStatistics cs = joinedColStats.get(key);
           if (cs != null && cs.getCountDistint() < minNDV) {
             minNDV = cs.getCountDistint();
@@ -1523,7 +1521,6 @@ public class StatsRulesProcFactory {
         if (minNDV != Long.MAX_VALUE) {
           for (Map.Entry<Integer, List<String>> entry : joinKeys.entrySet()) {
             String key = entry.getValue().get(joinColIdx);
-            key = StatsUtils.stripPrefixFromColumnName(key);
             ColStatistics cs = joinedColStats.get(key);
             if (cs != null) {
               cs.setCountDistint(minNDV);
@@ -1569,7 +1566,7 @@ public class StatsRulesProcFactory {
         long denom = 1;
         for (int i = 0; i < distinctVals.size(); i++) {
           if (i != minIdx) {
-            denom *= distinctVals.get(i);
+            denom = StatsUtils.safeMult(denom, distinctVals.get(i));
           }
         }
         return denom;
@@ -1613,12 +1610,13 @@ public class StatsRulesProcFactory {
             // in the absence of column statistics, compute data size based on
             // based on average row size
             Statistics wcStats = parentStats.clone();
+            limit = StatsUtils.getMaxIfOverflow(limit);
             if (limit <= parentStats.getNumRows()) {
               long numRows = limit;
               long avgRowSize = parentStats.getAvgRowSize();
-              long dataSize = avgRowSize * limit;
-              wcStats.setNumRows(setMaxIfInvalid(numRows));
-              wcStats.setDataSize(setMaxIfInvalid(dataSize));
+              long dataSize = StatsUtils.safeMult(avgRowSize, limit);
+              wcStats.setNumRows(numRows);
+              wcStats.setDataSize(dataSize);
             }
             lop.setStatistics(wcStats);
 
@@ -1662,26 +1660,26 @@ public class StatsRulesProcFactory {
           if (satisfyPrecondition(parentStats)) {
             List<ColStatistics> colStats = Lists.newArrayList();
             for (String key : outKeyColNames) {
-              String prefixedKey = "KEY." + key;
+              String prefixedKey = Utilities.ReduceField.KEY.toString() + "." + key;
               ExprNodeDesc end = colExprMap.get(prefixedKey);
               if (end != null) {
                 ColStatistics cs = StatsUtils
                     .getColStatisticsFromExpression(conf, parentStats, end);
                 if (cs != null) {
-                  cs.setColumnName(key);
+                  cs.setColumnName(prefixedKey);
                   colStats.add(cs);
                 }
               }
             }
 
             for (String val : outValueColNames) {
-              String prefixedVal = "VALUE." + val;
+              String prefixedVal = Utilities.ReduceField.VALUE.toString() + "." + val;
               ExprNodeDesc end = colExprMap.get(prefixedVal);
               if (end != null) {
                 ColStatistics cs = StatsUtils
                     .getColStatisticsFromExpression(conf, parentStats, end);
                 if (cs != null) {
-                  cs.setColumnName(val);
+                  cs.setColumnName(prefixedVal);
                   colStats.add(cs);
                 }
               }
@@ -1815,7 +1813,7 @@ public class StatsRulesProcFactory {
           + newNumRows + " rows will be set to Long.MAX_VALUE");
     }
 
-    newNumRows = setMaxIfInvalid(newNumRows);
+    newNumRows = StatsUtils.getMaxIfOverflow(newNumRows);
     long oldRowCount = stats.getNumRows();
     double ratio = (double) newNumRows / (double) oldRowCount;
     stats.setNumRows(newNumRows);
@@ -1842,10 +1840,10 @@ public class StatsRulesProcFactory {
       }
       stats.setColumnStats(colStats);
       long newDataSize = StatsUtils.getDataSizeFromColumnStats(newNumRows, colStats);
-      stats.setDataSize(setMaxIfInvalid(newDataSize));
+      stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize));
     } else {
       long newDataSize = (long) (ratio * stats.getDataSize());
-      stats.setDataSize(setMaxIfInvalid(newDataSize));
+      stats.setDataSize(StatsUtils.getMaxIfOverflow(newDataSize));
     }
   }
 
@@ -1853,14 +1851,4 @@ public class StatsRulesProcFactory {
     return stats != null && stats.getBasicStatsState().equals(Statistics.State.COMPLETE)
         && !stats.getColumnStatsState().equals(Statistics.State.NONE);
   }
-
-  /**
-   * negative number of rows or data sizes are invalid. It could be because of
-   * long overflow in which case return Long.MAX_VALUE
-   * @param val - input value
-   * @return Long.MAX_VALUE if val is negative else val
-   */
-  static long setMaxIfInvalid(long val) {
-    return val < 0 ? Long.MAX_VALUE : val;
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSQRewriteSemanticAnalyzer.java Fri Nov  7 20:41:34 2014
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.exec.ExplainSQRewriteTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.plan.ExplainSQRewriteWork;
@@ -55,7 +56,7 @@ public class ExplainSQRewriteSemanticAna
         ctx
         );
 
-    Task<? extends Serializable> explTask = TaskFactory.get(work, conf);
+    ExplainSQRewriteTask explTask = (ExplainSQRewriteTask) TaskFactory.get(work, conf);
 
     fieldList = explTask.getResultSchema();
     rootTasks.add(explTask);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java Fri Nov  7 20:41:34 2014
@@ -106,7 +106,7 @@ public class ExplainSemanticAnalyzer ext
     work.setAppendTaskType(
         HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES));
 
-    Task<? extends Serializable> explTask = TaskFactory.get(work, conf);
+    ExplainTask explTask = (ExplainTask) TaskFactory.get(work, conf);
 
     fieldList = explTask.getResultSchema();
     rootTasks.add(explTask);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/JoinCond.java Fri Nov  7 20:41:34 2014
@@ -79,4 +79,7 @@ public class JoinCond {
     this.joinType = joinType;
   }
 
+  public void setPreserved(boolean preserved) {
+    this.preserved = preserved;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java?rev=1637444&r1=1637443&r2=1637444&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java Fri Nov  7 20:41:34 2014
@@ -238,6 +238,8 @@ public class LoadSemanticAnalyzer extend
 
     // create final load/move work
 
+    boolean preservePartitionSpecs = false;
+
     Map<String, String> partSpec = ts.getPartSpec();
     if (partSpec == null) {
       partSpec = new LinkedHashMap<String, String>();
@@ -252,9 +254,14 @@ public class LoadSemanticAnalyzer extend
             throw new SemanticException(ErrorMsg.OFFLINE_TABLE_OR_PARTITION.
                 getMsg(ts.tableName + ":" + part.getName()));
           }
-          outputs.add(new WriteEntity(part,
-          (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE :
-              WriteEntity.WriteType.INSERT)));
+          if (isOverWrite){
+            outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT_OVERWRITE));
+          } else {
+            outputs.add(new WriteEntity(part, WriteEntity.WriteType.INSERT));
+            // If partition already exists and we aren't overwriting it, then respect
+            // its current location info rather than picking it from the parent TableDesc
+            preservePartitionSpecs = true;
+          }
         } else {
           outputs.add(new WriteEntity(ts.tableHandle,
           (isOverWrite ? WriteEntity.WriteType.INSERT_OVERWRITE :
@@ -269,6 +276,12 @@ public class LoadSemanticAnalyzer extend
     LoadTableDesc loadTableWork;
     loadTableWork = new LoadTableDesc(new Path(fromURI),
       Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
+    if (preservePartitionSpecs){
+      // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but
+      // but preservePartitionSpecs=false(default) here is not sufficient enough
+      // info to set inheritTableSpecs=true
+      loadTableWork.setInheritTableSpecs(false);
+    }
 
     Task<? extends Serializable> childTask = TaskFactory.get(new MoveWork(getInputs(),
         getOutputs(), loadTableWork, null, true, isLocal), conf);