You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ai...@apache.org on 2016/01/26 20:30:33 UTC

hive git commit: HIVE-12889: Support COUNT(DISTINCT) for partitioning query. (Aihua Xu, reviewed by Szehon Ho)

Repository: hive
Updated Branches:
  refs/heads/master eb4a16448 -> 27172bcb4


HIVE-12889: Support COUNT(DISTINCT) for partitioning query. (Aihua Xu, reviewed by Szehon Ho)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/27172bcb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/27172bcb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/27172bcb

Branch: refs/heads/master
Commit: 27172bcb4d01efaa9c308ea014baf5ec9ed6208e
Parents: eb4a164
Author: Aihua Xu <ai...@apache.org>
Authored: Tue Jan 19 11:24:30 2016 -0500
Committer: Aihua Xu <ai...@apache.org>
Committed: Tue Jan 26 14:28:29 2016 -0500

----------------------------------------------------------------------
 data/files/windowing_distinct.txt               |  6 ++
 .../functions/HiveSqlCountAggFunction.java      | 10 ++-
 .../functions/HiveSqlSumAggFunction.java        |  9 ++-
 .../translator/PlanModifierForASTConv.java      |  2 +-
 .../translator/SqlFunctionConverter.java        | 51 +++++++++----
 .../hadoop/hive/ql/parse/CalcitePlanner.java    |  4 +-
 .../hadoop/hive/ql/parse/IdentifiersParser.g    |  2 +-
 .../hadoop/hive/ql/parse/PTFInvocationSpec.java | 16 +++-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  5 --
 .../hadoop/hive/ql/parse/WindowingSpec.java     | 22 +++++-
 .../hive/ql/udf/generic/GenericUDAFCount.java   | 44 ++++++++++-
 .../queries/clientpositive/windowing_distinct.q | 30 ++++++++
 .../clientpositive/windowing_distinct.q.out     | 78 ++++++++++++++++++++
 .../objectinspector/ObjectInspectorUtils.java   | 32 ++++++++
 14 files changed, 272 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/data/files/windowing_distinct.txt
----------------------------------------------------------------------
diff --git a/data/files/windowing_distinct.txt b/data/files/windowing_distinct.txt
new file mode 100644
index 0000000..9271202
--- /dev/null
+++ b/data/files/windowing_distinct.txt
@@ -0,0 +1,6 @@
+1|19|442|65553|4294967380|26.43|37.77|true|alice zipper|2013-03-01 09:11:58.703217|29.62|^Ahistory^B
+2|124|336|65664|4294967435|74.72|42.47|true|bob davidson|2013-03-01 09:11:58.703302|45.40|^Ayard duty^B
+1|19|442|65553|4294967380|26.43|37.77|true|alice zipper|2013-03-01 09:11:58.703217|29.62|^Ahistory^B
+1|35|387|65619|4294967459|96.91|18.86|false|katie davidson|2013-03-01 09:11:58.703079|27.32|^Ahistory^B
+2|111|372|65656|4294967312|13.01|34.95|false|xavier quirinius|2013-03-01 09:11:58.703310|23.91|^Atopology^B
+2|124|336|65664|4294967435|74.72|42.47|true|bob davidson|2013-03-01 09:11:58.703302|45.40|^Ayard duty^B

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
index 7937040..58191e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlCountAggFunction.java
@@ -32,11 +32,12 @@ import org.apache.calcite.util.ImmutableIntList;
 
 public class HiveSqlCountAggFunction extends SqlAggFunction {
 
+  final boolean                isDistinct;
   final SqlReturnTypeInference returnTypeInference;
   final SqlOperandTypeInference operandTypeInference;
   final SqlOperandTypeChecker operandTypeChecker;
 
-  public HiveSqlCountAggFunction(SqlReturnTypeInference returnTypeInference,
+  public HiveSqlCountAggFunction(boolean isDistinct, SqlReturnTypeInference returnTypeInference,
       SqlOperandTypeInference operandTypeInference, SqlOperandTypeChecker operandTypeChecker) {
     super(
         "count",
@@ -45,11 +46,16 @@ public class HiveSqlCountAggFunction extends SqlAggFunction {
         operandTypeInference,
         operandTypeChecker,
         SqlFunctionCategory.NUMERIC);
+    this.isDistinct = isDistinct;
     this.returnTypeInference = returnTypeInference;
     this.operandTypeChecker = operandTypeChecker;
     this.operandTypeInference = operandTypeInference;
   }
 
+  public boolean isDistinct() {
+    return isDistinct;
+  }
+
   @Override
   public <T> T unwrap(Class<T> clazz) {
     if (clazz == SqlSplittableAggFunction.class) {
@@ -64,7 +70,7 @@ public class HiveSqlCountAggFunction extends SqlAggFunction {
     public AggregateCall other(RelDataTypeFactory typeFactory, AggregateCall e) {
 
       return AggregateCall.create(
-          new HiveSqlCountAggFunction(returnTypeInference, operandTypeInference, operandTypeChecker),
+          new HiveSqlCountAggFunction(isDistinct, returnTypeInference, operandTypeInference, operandTypeChecker),
           false, ImmutableIntList.of(), -1,
           typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true), "count");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
index 8f62970..056eaeb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/functions/HiveSqlSumAggFunction.java
@@ -47,14 +47,14 @@ import com.google.common.collect.ImmutableList;
  * is the same type.
  */
 public class HiveSqlSumAggFunction extends SqlAggFunction {
-
+  final boolean isDistinct;
   final SqlReturnTypeInference returnTypeInference;
   final SqlOperandTypeInference operandTypeInference;
   final SqlOperandTypeChecker operandTypeChecker;
 
   //~ Constructors -----------------------------------------------------------
 
-  public HiveSqlSumAggFunction(SqlReturnTypeInference returnTypeInference,
+  public HiveSqlSumAggFunction(boolean isDistinct, SqlReturnTypeInference returnTypeInference,
     SqlOperandTypeInference operandTypeInference, SqlOperandTypeChecker operandTypeChecker) {
     super(
         "sum",
@@ -66,6 +66,7 @@ public class HiveSqlSumAggFunction extends SqlAggFunction {
     this.returnTypeInference = returnTypeInference;
     this.operandTypeChecker = operandTypeChecker;
     this.operandTypeInference = operandTypeInference;
+    this.isDistinct = isDistinct;
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -85,7 +86,7 @@ public class HiveSqlSumAggFunction extends SqlAggFunction {
     public AggregateCall other(RelDataTypeFactory typeFactory, AggregateCall e) {
       RelDataType countRetType = typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true);
       return AggregateCall.create(
-        new HiveSqlCountAggFunction(ReturnTypes.explicit(countRetType), operandTypeInference, operandTypeChecker),
+        new HiveSqlCountAggFunction(isDistinct, ReturnTypes.explicit(countRetType), operandTypeInference, operandTypeChecker),
         false, ImmutableIntList.of(), -1, countRetType, "count");
     }
 
@@ -116,7 +117,7 @@ public class HiveSqlSumAggFunction extends SqlAggFunction {
         throw new AssertionError("unexpected count " + merges);
       }
       int ordinal = extra.register(node);
-      return AggregateCall.create(new HiveSqlSumAggFunction(returnTypeInference, operandTypeInference, operandTypeChecker),
+      return AggregateCall.create(new HiveSqlSumAggFunction(isDistinct, returnTypeInference, operandTypeInference, operandTypeChecker),
           false, ImmutableList.of(ordinal), -1, aggregateCall.type, aggregateCall.name);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java
index e2fbb4f..1a543fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/PlanModifierForASTConv.java
@@ -379,7 +379,7 @@ public class PlanModifierForASTConv {
     RelDataType longType = TypeConverter.convert(TypeInfoFactory.longTypeInfo, typeFactory);
     RelDataType intType = TypeConverter.convert(TypeInfoFactory.intTypeInfo, typeFactory);
     // Create the dummy aggregation.
-    SqlAggFunction countFn = SqlFunctionConverter.getCalciteAggFn("count",
+    SqlAggFunction countFn = SqlFunctionConverter.getCalciteAggFn("count", false,
         ImmutableList.of(intType), longType);
     // TODO: Using 0 might be wrong; might need to walk down to find the
     // proper index of a dummy.

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
index 37249f9..75c38fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/SqlFunctionConverter.java
@@ -217,10 +217,18 @@ public class SqlFunctionConverter {
         } else if (op.kind == SqlKind.PLUS_PREFIX) {
           node = (ASTNode) ParseDriver.adaptor.create(HiveParser.PLUS, "PLUS");
         } else {
-          if (op.getName().toUpperCase().equals(SqlStdOperatorTable.COUNT.getName())
-              && children.size() == 0) {
-            node = (ASTNode) ParseDriver.adaptor.create(HiveParser.TOK_FUNCTIONSTAR,
+          // Handle 'COUNT' function for the case of COUNT(*) and COUNT(DISTINCT)
+          if (op instanceof HiveSqlCountAggFunction) {
+            if (children.size() == 0) {
+              node = (ASTNode) ParseDriver.adaptor.create(HiveParser.TOK_FUNCTIONSTAR,
                 "TOK_FUNCTIONSTAR");
+            } else {
+              HiveSqlCountAggFunction countFunction = (HiveSqlCountAggFunction)op;
+              if (countFunction.isDistinct()) {
+                node = (ASTNode) ParseDriver.adaptor.create(HiveParser.TOK_FUNCTIONDI,
+                    "TOK_FUNCTIONDI");
+              }
+            }
           }
           node.addChild((ASTNode) ParseDriver.adaptor.create(HiveParser.Identifier, op.getName()));
         }
@@ -416,33 +424,46 @@ public class SqlFunctionConverter {
     return calciteOp;
   }
 
-  public static SqlAggFunction getCalciteAggFn(String hiveUdfName,
+  public static SqlAggFunction getCalciteAggFn(String hiveUdfName, boolean isDistinct,
       ImmutableList<RelDataType> calciteArgTypes, RelDataType calciteRetType) {
     SqlAggFunction calciteAggFn = (SqlAggFunction) hiveToCalcite.get(hiveUdfName);
 
     if (calciteAggFn == null) {
-      CalciteUDFInfo uInf = getUDFInfo(hiveUdfName, calciteArgTypes, calciteRetType);
+      CalciteUDFInfo udfInfo = getUDFInfo(hiveUdfName, calciteArgTypes, calciteRetType);
 
       switch (hiveUdfName.toLowerCase()) {
         case "sum":
-          calciteAggFn = new HiveSqlSumAggFunction(uInf.returnTypeInference,
-              uInf.operandTypeInference, uInf.operandTypeChecker);
+          calciteAggFn = new HiveSqlSumAggFunction(
+              isDistinct,
+              udfInfo.returnTypeInference,
+              udfInfo.operandTypeInference,
+              udfInfo.operandTypeChecker);
           break;
         case "count":
-          calciteAggFn = new HiveSqlCountAggFunction(uInf.returnTypeInference,
-              uInf.operandTypeInference, uInf.operandTypeChecker);
+          calciteAggFn = new HiveSqlCountAggFunction(
+              isDistinct,
+              udfInfo.returnTypeInference,
+              udfInfo.operandTypeInference,
+              udfInfo.operandTypeChecker);
           break;
         case "min":
-          calciteAggFn = new HiveSqlMinMaxAggFunction(uInf.returnTypeInference,
-              uInf.operandTypeInference, uInf.operandTypeChecker, true);
+          calciteAggFn = new HiveSqlMinMaxAggFunction(
+              udfInfo.returnTypeInference,
+              udfInfo.operandTypeInference,
+              udfInfo.operandTypeChecker, true);
           break;
         case "max":
-          calciteAggFn = new HiveSqlMinMaxAggFunction(uInf.returnTypeInference,
-              uInf.operandTypeInference, uInf.operandTypeChecker, false);
+          calciteAggFn = new HiveSqlMinMaxAggFunction(
+              udfInfo.returnTypeInference,
+              udfInfo.operandTypeInference,
+              udfInfo.operandTypeChecker, false);
           break;
         default:
-          calciteAggFn = new CalciteUDAF(uInf.udfName, uInf.returnTypeInference,
-              uInf.operandTypeInference, uInf.operandTypeChecker);
+          calciteAggFn = new CalciteUDAF(
+              udfInfo.udfName,
+              udfInfo.returnTypeInference,
+              udfInfo.operandTypeInference,
+              udfInfo.operandTypeChecker);
           break;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
index 3fefbd7..8cc3747 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java
@@ -1950,7 +1950,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
 
       // 3. Get Aggregation FN from Calcite given name, ret type and input arg
       // type
-      final SqlAggFunction aggregation = SqlFunctionConverter.getCalciteAggFn(agg.m_udfName,
+      final SqlAggFunction aggregation = SqlFunctionConverter.getCalciteAggFn(agg.m_udfName, agg.m_distinct,
           aggArgRelDTBldr.build(), aggFnRetType);
 
       return new AggregateCall(aggregation, agg.m_distinct, argList, aggFnRetType, null);
@@ -2646,7 +2646,7 @@ public class CalcitePlanner extends SemanticAnalyzer {
 
         // 5. Get Calcite Agg Fn
         final SqlAggFunction calciteAggFn = SqlFunctionConverter.getCalciteAggFn(
-            hiveAggInfo.m_udfName, calciteAggFnArgsType, calciteAggFnRetType);
+            hiveAggInfo.m_udfName, hiveAggInfo.m_distinct, calciteAggFnArgsType, calciteAggFnRetType);
 
         // 6. Translate Window spec
         RowResolver inputRR = relToHiveRR.get(srcRel);

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index 15ca754..61bd10c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -195,7 +195,7 @@ function
     RPAREN (KW_OVER ws=window_specification)?
            -> {$star != null}? ^(TOK_FUNCTIONSTAR functionName $ws?)
            -> {$dist == null}? ^(TOK_FUNCTION functionName (selectExpression+)? $ws?)
-                            -> ^(TOK_FUNCTIONDI functionName (selectExpression+)?)
+                            -> ^(TOK_FUNCTIONDI functionName (selectExpression+)? $ws?)
     ;
 
 functionName

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java
index 29b8510..a8980eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java
@@ -411,6 +411,18 @@ public class PTFInvocationSpec {
       this.expressions = columns;
     }
 
+    /**
+     * Add order expressions from the list of expressions in the format of ASTNode
+     * @param args
+     */
+    public void addExpressions(ArrayList<ASTNode> nodes) {
+      for (int i = 0; i < nodes.size(); i++) {
+        OrderExpression expr = new OrderExpression();
+        expr.setExpression(nodes.get(i));
+        addExpression(expr);
+      }
+    }
+
     public void addExpression(OrderExpression c)
     {
       expressions = expressions == null ? new ArrayList<OrderExpression>() : expressions;
@@ -500,7 +512,9 @@ public class PTFInvocationSpec {
   {
     Order order;
 
-    public OrderExpression() {}
+    public OrderExpression() {
+      order = Order.ASC;
+    }
 
     public OrderExpression(PartitionExpression peSpec)
     {

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 5ff90a6..8c880c3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -11640,11 +11640,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       break;
     }
 
-    if ( wfSpec.isDistinct() ) {
-      throw new SemanticException(generateErrorMessage(node,
-          "Count/Sum distinct not supported with Windowing"));
-    }
-
     wfSpec.setExpression(node);
 
     ASTNode nameNode = (ASTNode) node.getChild(0);

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
index a181f7c..1bfe8d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
@@ -122,6 +122,9 @@ public class WindowingSpec {
       WindowFunctionSpec wFn = (WindowFunctionSpec) expr;
       WindowSpec wdwSpec = wFn.getWindowSpec();
 
+      // 0. Precheck supported syntax
+      precheckSyntax(wFn, wdwSpec);
+
       // 1. For Wdw Specs that refer to Window Defns, inherit missing components
       if ( wdwSpec != null ) {
         ArrayList<String> sources = new ArrayList<String>();
@@ -144,7 +147,15 @@ public class WindowingSpec {
       validateWindowFrame(wdwSpec);
 
       // 5. Add the Partition expressions as the Order if there is no Order and validate Order spec.
-      setAndValidateOrderSpec(wdwSpec);
+      setAndValidateOrderSpec(wFn, wdwSpec);
+    }
+  }
+
+  private void precheckSyntax(WindowFunctionSpec wFn, WindowSpec wdwSpec) throws SemanticException {
+    if (wdwSpec != null ) {
+      if (wFn.isDistinct && (wdwSpec.windowFrame != null || wdwSpec.getOrder() != null) ) {
+        throw new SemanticException("Function with DISTINCT cannot work with partition ORDER BY or windowing clause.");
+      }
     }
   }
 
@@ -274,8 +285,8 @@ public class WindowingSpec {
    * @param wdwSpec
    * @throws SemanticException
    */
-  private void setAndValidateOrderSpec(WindowSpec wdwSpec) throws SemanticException {
-    wdwSpec.ensureOrderSpec();
+  private void setAndValidateOrderSpec(WindowFunctionSpec wFn, WindowSpec wdwSpec) throws SemanticException {
+    wdwSpec.ensureOrderSpec(wFn);
 
     WindowFrameSpec wFrame = wdwSpec.getWindowFrame();
     OrderSpec order = wdwSpec.getOrder();
@@ -479,10 +490,13 @@ public class WindowingSpec {
      * Partition expressions when the OrderSpec is null; but for now we are setting up
      * an OrderSpec that copies the Partition expressions.
      */
-    protected void ensureOrderSpec() {
+    protected void ensureOrderSpec(WindowFunctionSpec wFn) throws SemanticException {
       if ( getOrder() == null ) {
         OrderSpec order = new OrderSpec();
         order.prefixBy(getPartition());
+        if (wFn.isDistinct) {
+          order.addExpressions(wFn.getArgs());
+        }
         setOrder(order);
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
index eaf112e..f526c43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFCount.java
@@ -17,16 +17,21 @@
  */
 package org.apache.hadoop.hive.ql.udf.generic;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.hive.ql.exec.Description;
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
+import org.apache.hadoop.hive.serde2.lazy.LazyString;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 
 /**
  * This class implements the COUNT aggregation function as in SQL.
@@ -67,8 +72,11 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
       assert !paramInfo.isAllColumns() : "* not supported in expression list";
     }
 
-    return new GenericUDAFCountEvaluator().setCountAllColumns(
-        paramInfo.isAllColumns());
+    GenericUDAFCountEvaluator countEvaluator = new GenericUDAFCountEvaluator();
+    countEvaluator.setCountAllColumns(paramInfo.isAllColumns());
+    countEvaluator.setCountDistinct(paramInfo.isDistinct());
+
+    return countEvaluator;
   }
 
   /**
@@ -77,7 +85,9 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
    */
   public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator {
     private boolean countAllColumns = false;
+    private boolean countDistinct = false;
     private LongObjectInspector partialCountAggOI;
+    private ObjectInspector[] inputOI, outputOI;
     private LongWritable result;
 
     @Override
@@ -86,19 +96,27 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
       super.init(m, parameters);
       if (mode == Mode.PARTIAL2 || mode == Mode.FINAL) {
         partialCountAggOI = (LongObjectInspector)parameters[0];
+      } else {
+        inputOI = parameters;
+        outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
+            ObjectInspectorCopyOption.JAVA);
       }
       result = new LongWritable(0);
       return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
     }
 
-    private GenericUDAFCountEvaluator setCountAllColumns(boolean countAllCols) {
+    private void setCountAllColumns(boolean countAllCols) {
       countAllColumns = countAllCols;
-      return this;
+    }
+
+    private void setCountDistinct(boolean countDistinct) {
+      this.countDistinct = countDistinct;
     }
 
     /** class for storing count value. */
     @AggregationType(estimable = true)
     static class CountAgg extends AbstractAggregationBuffer {
+      Object[] prevColumns = null;    // Column values from previous row. Used to compare with current row for the case of COUNT(DISTINCT).
       long value;
       @Override
       public int estimate() { return JavaDataModel.PRIMITIVES2; }
@@ -113,6 +131,7 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
 
     @Override
     public void reset(AggregationBuffer agg) throws HiveException {
+      ((CountAgg) agg).prevColumns = null;
       ((CountAgg) agg).value = 0;
     }
 
@@ -134,6 +153,23 @@ public class GenericUDAFCount implements GenericUDAFResolver2 {
             break;
           }
         }
+
+        // Skip the counting if the values are the same for COUNT(DISTINCT) case
+        if (countThisRow && countDistinct) {
+          Object[] prevColumns = ((CountAgg) agg).prevColumns;
+          if (prevColumns == null) {
+            ((CountAgg) agg).prevColumns = new Object[parameters.length];
+          } else if (ObjectInspectorUtils.compare(parameters, inputOI, prevColumns, outputOI) == 0) {
+             countThisRow = false;
+          }
+
+          // We need to keep a copy of values from previous row.
+          if (countThisRow) {
+            ((CountAgg) agg).prevColumns = ObjectInspectorUtils.copyToStandardObject(
+                  parameters, inputOI, ObjectInspectorCopyOption.JAVA);
+          }
+        }
+
         if (countThisRow) {
           ((CountAgg) agg).value++;
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/test/queries/clientpositive/windowing_distinct.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/windowing_distinct.q b/ql/src/test/queries/clientpositive/windowing_distinct.q
new file mode 100644
index 0000000..94f4044
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/windowing_distinct.q
@@ -0,0 +1,30 @@
+drop table windowing_distinct;
+
+create table windowing_distinct(
+           index int,
+           t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           ts timestamp,
+           dec decimal,
+           bin binary)
+       row format delimited
+       fields terminated by '|';
+
+load data local inpath '../../data/files/windowing_distinct.txt' into table windowing_distinct;
+
+
+SELECT COUNT(DISTINCT t) OVER (PARTITION BY index),
+       COUNT(DISTINCT d) OVER (PARTITION BY index),
+       COUNT(DISTINCT bo) OVER (PARTITION BY index),
+       COUNT(DISTINCT s) OVER (PARTITION BY index),
+       COUNT(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index),
+       COUNT(DISTINCT ts) OVER (PARTITION BY index),
+       COUNT(DISTINCT dec) OVER (PARTITION BY index),
+       COUNT(DISTINCT bin) OVER (PARTITION BY index)
+FROM windowing_distinct;

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/ql/src/test/results/clientpositive/windowing_distinct.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/windowing_distinct.q.out b/ql/src/test/results/clientpositive/windowing_distinct.q.out
new file mode 100644
index 0000000..50f8ff8
--- /dev/null
+++ b/ql/src/test/results/clientpositive/windowing_distinct.q.out
@@ -0,0 +1,78 @@
+PREHOOK: query: drop table windowing_distinct
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table windowing_distinct
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table windowing_distinct(
+           index int,
+           t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           ts timestamp,
+           dec decimal,
+           bin binary)
+       row format delimited
+       fields terminated by '|'
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@windowing_distinct
+POSTHOOK: query: create table windowing_distinct(
+           index int,
+           t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           ts timestamp,
+           dec decimal,
+           bin binary)
+       row format delimited
+       fields terminated by '|'
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@windowing_distinct
+PREHOOK: query: load data local inpath '../../data/files/windowing_distinct.txt' into table windowing_distinct
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@windowing_distinct
+POSTHOOK: query: load data local inpath '../../data/files/windowing_distinct.txt' into table windowing_distinct
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@windowing_distinct
+PREHOOK: query: SELECT COUNT(DISTINCT t) OVER (PARTITION BY index),
+       COUNT(DISTINCT d) OVER (PARTITION BY index),
+       COUNT(DISTINCT bo) OVER (PARTITION BY index),
+       COUNT(DISTINCT s) OVER (PARTITION BY index),
+       COUNT(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index),
+       COUNT(DISTINCT ts) OVER (PARTITION BY index),
+       COUNT(DISTINCT dec) OVER (PARTITION BY index),
+       COUNT(DISTINCT bin) OVER (PARTITION BY index)
+FROM windowing_distinct
+PREHOOK: type: QUERY
+PREHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT COUNT(DISTINCT t) OVER (PARTITION BY index),
+       COUNT(DISTINCT d) OVER (PARTITION BY index),
+       COUNT(DISTINCT bo) OVER (PARTITION BY index),
+       COUNT(DISTINCT s) OVER (PARTITION BY index),
+       COUNT(DISTINCT concat('Mr.', s)) OVER (PARTITION BY index),
+       COUNT(DISTINCT ts) OVER (PARTITION BY index),
+       COUNT(DISTINCT dec) OVER (PARTITION BY index),
+       COUNT(DISTINCT bin) OVER (PARTITION BY index)
+FROM windowing_distinct
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@windowing_distinct
+#### A masked pattern was here ####
+2	2	2	2	2	2	2	1
+2	2	2	2	2	2	2	1
+2	2	2	2	2	2	2	1
+2	2	2	2	2	2	2	2
+2	2	2	2	2	2	2	2
+2	2	2	2	2	2	2	2

http://git-wip-us.apache.org/repos/asf/hive/blob/27172bcb/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
index 7a13eb0..33e5357 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
@@ -145,6 +145,21 @@ public final class ObjectInspectorUtils {
   }
 
   /**
+   * Get the corresponding standard ObjectInspector array for an array of ObjectInspector.
+   */
+  public static ObjectInspector[] getStandardObjectInspector(ObjectInspector[] ois,
+      ObjectInspectorCopyOption objectInspectorOption) {
+    if (ois == null) return null;
+
+    ObjectInspector[] result = new ObjectInspector[ois.length];
+    for (int i = 0; i < ois.length; i++) {
+      result[i] = getStandardObjectInspector(ois[i], objectInspectorOption);
+    }
+
+    return result;
+  }
+
+  /**
    * Get the corresponding standard ObjectInspector for an ObjectInspector.
    *
    * The returned ObjectInspector can be used to inspect the standard object.
@@ -274,6 +289,23 @@ public final class ObjectInspectorUtils {
   }
 
   /**
+   * Returns a deep copy of an array of objects
+   */
+  public static Object[] copyToStandardObject(
+      Object[] o, ObjectInspector[] oi, ObjectInspectorCopyOption objectInspectorOption) {
+    if (o == null) return null;
+    assert(o.length == oi.length);
+
+    Object[] result = new Object[o.length];
+    for (int i = 0; i < o.length; i++) {
+      result[i] = ObjectInspectorUtils.copyToStandardObject(
+          o[i], oi[i], objectInspectorOption);
+    }
+
+    return result;
+  }
+
+  /**
    * Returns a deep copy of the Object o that can be scanned by a
    * StandardObjectInspector returned by getStandardObjectInspector(oi).
    */