You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/08/15 00:53:53 UTC

svn commit: r804406 - in /hadoop/pig/trunk: ./ src/org/apache/pig/builtin/ test/org/apache/pig/test/

Author: olga
Date: Fri Aug 14 22:53:53 2009
New Revision: 804406

URL: http://svn.apache.org/viewvc?rev=804406&view=rev
Log:
PIG-892: Make COUNT and AVG deal with nulls accordingly with SQL standart(olgan)

Added:
    hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestAlgebraicEvalLocal.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLocal.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Aug 14 22:53:53 2009
@@ -22,7 +22,9 @@
 
 INCOMPATIBLE CHANGES
 
-PIG-734:  Changed maps to only take strings as keys (gates).
+PIG-892: Make COUNT and AVG deal with nulls accordingly with SQL standart
+(olgan)
+PIG-734: Changed maps to only take strings as keys (gates).
 
 IMPROVEMENTS
 

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java Fri Aug 14 22:53:53 2009
@@ -92,14 +92,17 @@
                     dba = (DataByteArray)tp.get(0);
                 }
                 t.set(0, dba != null ? Double.valueOf(dba.toString()) : null);
-                t.set(1, 1L);
+                if (dba == null)
+                    t.set(1, 0L);
+                else
+                    t.set(1, 1L);
                 return t;
             } catch(NumberFormatException nfe) {
                 // invalid input,
                 // treat this input as null
                 try {
                     t.set(0, null);
-                    t.set(1, 1L);
+                    t.set(1, 0L);
                 } catch (ExecException e) {
                     throw e;
                 }
@@ -198,7 +201,15 @@
 
     static protected long count(Tuple input) throws ExecException {
         DataBag values = (DataBag)input.get(0);
-        return values.size();
+        long cnt = 0;
+        Iterator it = values.iterator();
+        while (it.hasNext()){
+            Tuple t = (Tuple)it.next(); 
+            if (t != null && t.size() > 0 && t.get(0) != null)
+                cnt ++;
+        }
+                    
+        return cnt;
     }
 
     static protected Double sum(Tuple input) throws ExecException, IOException {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java Fri Aug 14 22:53:53 2009
@@ -43,7 +43,14 @@
     public Long exec(Tuple input) throws IOException {
         try {
             DataBag bag = (DataBag)input.get(0);
-            return bag.size();
+            Iterator it = bag.iterator();
+            long cnt = 0;
+            while (it.hasNext()){
+                    Tuple t = (Tuple)it.next();
+                    if (t != null && t.size() > 0 && t.get(0) != null )
+                            cnt++;
+            }
+            return cnt;
         } catch (ExecException ee) {
             throw ee;
         } catch (Exception e) {
@@ -74,8 +81,13 @@
             // input of a bag with a single tuple - the 
             // count should always be 1 if bag is non empty
             DataBag bag = (DataBag)input.get(0);
-            return mTupleFactory.newTuple(bag.iterator().hasNext()? 
-                    new Long(1) : new Long(0));
+            Iterator it = bag.iterator();
+            if (it.hasNext()){
+                Tuple t = (Tuple)it.next();
+                if (t != null && t.size() > 0 && t.get(0) != null)
+                    return mTupleFactory.newTuple(new Long(1));
+            }
+            return mTupleFactory.newTuple(new Long(0));
         }
     }
 

Added: hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java?rev=804406&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT_STAR.java Fri Aug 14 22:53:53 2009
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
+
+/**
+ * Generates the count of the values of the first field of a tuple. 
+ * This class is different from COUNT in that it counts all NULL values and as such
+ * implements SQL COUNT(*) semantics. This class is Algebraic in
+ * implemenation, so if possible the execution will be split into a local and global functions
+ */
+public class COUNT_STAR extends EvalFunc<Long> implements Algebraic{
+    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    @Override
+    public Long exec(Tuple input) throws IOException {
+        try {
+            DataBag bag = (DataBag)input.get(0);
+            return bag.size();
+        } catch (ExecException ee) {
+            throw ee;
+        } catch (Exception e) {
+            int errCode = 2106;                
+            String msg = "Error while computing count in " + this.getClass().getSimpleName();
+            throw new ExecException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    public String getInitial() {
+        return Initial.class.getName();
+    }
+
+    public String getIntermed() {
+        return Intermediate.class.getName();
+    }
+
+    public String getFinal() {
+        return Final.class.getName();
+    }
+
+    static public class Initial extends EvalFunc<Tuple> {
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            // Since Initial is guaranteed to be called
+            // only in the map, it will be called with an
+            // input of a bag with a single tuple - the 
+            // count should always be 1 if bag is non empty
+            DataBag bag = (DataBag)input.get(0);
+            return mTupleFactory.newTuple(bag.iterator().hasNext()? 
+                    Long.valueOf(1L) : Long.valueOf(0L));
+        }
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple> {
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+            try {
+                return mTupleFactory.newTuple(sum(input));
+            } catch (ExecException ee) {
+                throw ee;
+            } catch (Exception e) {
+                int errCode = 2106;                
+                String msg = "Error while computing count in " + this.getClass().getSimpleName();
+                throw new ExecException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+
+    static public class Final extends EvalFunc<Long> {
+        @Override
+        public Long exec(Tuple input) throws IOException {
+            try {
+                return sum(input);
+            } catch (Exception ee) {
+                int errCode = 2106;
+                String msg = "Error while computing count in " + this.getClass().getSimpleName();
+                throw new ExecException(msg, errCode, PigException.BUG, ee);
+            }
+        }
+    }
+
+    static protected Long sum(Tuple input) throws ExecException, NumberFormatException {
+        DataBag values = (DataBag)input.get(0);
+        long sum = 0;
+        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+            Tuple t = it.next();
+            // Have faith here.  Checking each value before the cast is
+            // just too much.
+            sum += (Long)t.get(0);
+        }
+        return sum;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema(null, DataType.LONG)); 
+    }
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java Fri Aug 14 22:53:53 2009
@@ -89,7 +89,11 @@
                     d = (Double)(tp.get(0));
                 }
                 t.set(0, d);
-                t.set(1, 1L);
+                if (d != null){
+                    t.set(1, 1L);
+                }else{    
+                    t.set(1, 0L);
+                }
                 return t;
             } catch (ExecException ee) {
                 throw ee;
@@ -184,7 +188,14 @@
 
     static protected long count(Tuple input) throws ExecException {
         DataBag values = (DataBag)input.get(0);
-        return values.size();
+        Iterator it = values.iterator();
+        long cnt = 0;
+        while (it.hasNext()){
+            Tuple t = (Tuple)it.next();
+            if (t != null && t.size() > 0 && t.get(0) != null)
+                cnt++;
+        }
+        return cnt;
     }
 
     static protected Double sum(Tuple input) throws ExecException, IOException {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java Fri Aug 14 22:53:53 2009
@@ -86,7 +86,10 @@
                     f = (Float)(tp.get(0));
                 }
                 t.set(0, f != null ? new Double(f) : null);
-                t.set(1, 1L);
+                if (f != null)
+                    t.set(1, 1L);
+                else
+                    t.set(1, 0L);
                 return t;
             } catch (ExecException ee) {
                 throw ee;
@@ -181,7 +184,15 @@
 
     static protected long count(Tuple input) throws ExecException {
         DataBag values = (DataBag)input.get(0);
-        return values.size();
+        Iterator it = values.iterator();
+        long cnt = 0;
+        while (it.hasNext()){
+            Tuple t = (Tuple)it.next();
+            if (t != null && t.size() > 0 && t.get(0) != null)
+                cnt++;
+        }
+
+        return cnt;
     }
 
     static protected Double sum(Tuple input) throws ExecException, IOException {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java Fri Aug 14 22:53:53 2009
@@ -87,7 +87,10 @@
                     i = (Integer)tp.get(0);
                 }
                 t.set(0, i != null ? new Long(i): null);
-                t.set(1, 1L);
+                if (i != null)
+                    t.set(1, 1L);
+                else
+                    t.set(1, 0L);
                 return t;
             } catch (ExecException ee) {
                 throw ee;
@@ -182,7 +185,15 @@
 
     static protected long count(Tuple input) throws ExecException {
         DataBag values = (DataBag)input.get(0);
-        return values.size();
+        Iterator it = values.iterator();
+        long cnt = 0;
+        while (it.hasNext()){
+            Tuple t = (Tuple)it.next();
+            if (t != null && t.size() > 0 && t.get(0) != null)
+                cnt++;
+        }
+
+        return cnt;
     }
 
     static protected Long sum(Tuple input) throws ExecException, IOException {

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java Fri Aug 14 22:53:53 2009
@@ -86,7 +86,10 @@
                     l = (Long)(tp.get(0));
                 }
                 t.set(0, l);
-                t.set(1, 1L);
+                if (l != null)
+                    t.set(1, 1L);
+                else
+                    t.set(1, 0L);
                 return t;
             } catch (ExecException ee) {
                 throw ee;
@@ -181,7 +184,15 @@
 
     static protected long count(Tuple input) throws ExecException {
         DataBag values = (DataBag)input.get(0);
-        return values.size();
+        Iterator it = values.iterator();
+        long cnt = 0;
+        while (it.hasNext()){
+            Tuple t = (Tuple)it.next();
+            if (t != null && t.size() > 0 && t.get(0) != null)
+                cnt++;
+        }
+
+        return cnt;
     }
 
     static protected Long sum(Tuple input) throws ExecException, IOException {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java Fri Aug 14 22:53:53 2009
@@ -204,6 +204,7 @@
                         // for half the groups
                         // emit nulls
                         ps.println(j%10 + ":");
+                        groupsize--;
                     } else {
                         ps.println(j%10 + ":" + j);
                     }
@@ -223,8 +224,6 @@
                 Double group = Double.valueOf(a.toString());
                 if(group == 0.0) {
                     Long count = DataType.toLong(t.get(1));
-                    // right now count with nulls is same as
-                    // count without nulls
                     assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i], groupsize, count.longValue());                    
                 }
             }
@@ -237,19 +236,21 @@
         for (int i = 0; i < nullFlags.length; i++) {
             PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
             long groupsize = 0;
+            long nonNullCnt = 0;
             if(nullFlags[i] == false) {
                 // generate data without nulls
                 for(int j = 0; j < LOOP_COUNT; j++) {
-                    if(j%10 == 0) groupsize++;
+                    if(j%10 == 0) {groupsize++; nonNullCnt++;}
                     ps.println(j%10 + ":" + j);
                 }
             } else {
                 // generate data with nulls
                 for(int j = 0; j < LOOP_COUNT; j++) {
-                    if(j%10 == 0) groupsize++;
+                    if(j%10 == 0) {groupsize++; nonNullCnt++;}
                     if(j % 20 == 0) {
                         // for half the groups
                         // emit nulls
+                        nonNullCnt--;
                         ps.println(j%10 + ":");
                     } else {
                         ps.println(j%10 + ":" + j);
@@ -257,7 +258,8 @@
                 }
             }
             ps.close();
-            String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "' using " + PigStorage.class.getName() + "(':')) by $0) generate group, COUNT($1.$1), COUNT($1.$0) ;";
+            String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + 
+                           "' using " + PigStorage.class.getName() + "(':')) by $0) generate group, COUNT($1.$1), COUNT($1.$0) ;";
             System.out.println(query);
             pig.registerQuery(query);
             Iterator it = pig.openIterator("myid");
@@ -269,12 +271,10 @@
                 String a = t.get(0).toString();
                 Double group = Double.valueOf(a.toString());
                 if(group == 0.0) {
-                    // right now count with nulls is same
-                    // as count without nulls
                     Long count = DataType.toLong(t.get(2));
                     assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i],groupsize, count.longValue());
                     count = DataType.toLong(t.get(1));
-                    assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i],groupsize, count.longValue());
+                    assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i],nonNullCnt, count.longValue());
                 }
             }
         }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestAlgebraicEvalLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestAlgebraicEvalLocal.java?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestAlgebraicEvalLocal.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestAlgebraicEvalLocal.java Fri Aug 14 22:53:53 2009
@@ -204,13 +204,15 @@
                         // for half the groups
                         // emit nulls
                         ps.println(j%10 + ":");
+                        groupsize --;
                     } else {
                         ps.println(j%10 + ":" + j);
                     }
                 }
             }         
             ps.close();
-            String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "' using " + PigStorage.class.getName() + "(':')) by $0) generate group, COUNT($1.$1) ;";
+            String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "' using " + 
+                           PigStorage.class.getName() + "(':')) by $0) generate group, COUNT($1.$1) ;";
             System.out.println(query);
             pig.registerQuery(query);
             Iterator it = pig.openIterator("myid");
@@ -223,8 +225,6 @@
                 Double group = Double.valueOf(a.toString());
                 if(group == 0.0) {
                     Long count = DataType.toLong(t.get(1));
-                    // right now count with nulls is same as
-                    // count without nulls
                     assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i], groupsize, count.longValue());                    
                 }
             }
@@ -237,27 +237,30 @@
         for (int i = 0; i < nullFlags.length; i++) {
             PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
             long groupsize = 0;
+            long nonNullCnt = 0;
             if(nullFlags[i] == false) {
                 // generate data without nulls
                 for(int j = 0; j < LOOP_COUNT; j++) {
-                    if(j%10 == 0) groupsize++;
+                    if(j%10 == 0) {groupsize++; nonNullCnt++;}
                     ps.println(j%10 + ":" + j);
                 }
             } else {
                 // generate data with nulls
                 for(int j = 0; j < LOOP_COUNT; j++) {
-                    if(j%10 == 0) groupsize++;
+                    if(j%10 == 0) {groupsize++; nonNullCnt++;}
                     if(j % 20 == 0) {
                         // for half the groups
                         // emit nulls
                         ps.println(j%10 + ":");
+                        nonNullCnt--;
                     } else {
                         ps.println(j%10 + ":" + j);
                     }
                 }
             }
             ps.close();
-            String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "' using " + PigStorage.class.getName() + "(':')) by $0) generate group, COUNT($1.$1), COUNT($1.$0) ;";
+            String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "' using " + 
+                           PigStorage.class.getName() + "(':')) by $0) generate group, COUNT($1.$1), COUNT($1.$0) ;";
             System.out.println(query);
             pig.registerQuery(query);
             Iterator it = pig.openIterator("myid");
@@ -269,12 +272,10 @@
                 String a = t.get(0).toString();
                 Double group = Double.valueOf(a.toString());
                 if(group == 0.0) {
-                    // right now count with nulls is same
-                    // as count without nulls
                     Long count = DataType.toLong(t.get(2));
                     assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i],groupsize, count.longValue());
                     count = DataType.toLong(t.get(1));
-                    assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i],groupsize, count.longValue());
+                    assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i],nonNullCnt, count.longValue());
                 }
             }
         }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Fri Aug 14 22:53:53 2009
@@ -125,11 +125,11 @@
         expectedMap.put("LongSum", new Long(145776964666362L));
         expectedMap.put("FloatSum", new Double(56.15395));
 
-        expectedMap.put("AVG", new Double(5.0));
-        expectedMap.put("DoubleAvg", new Double(15.506126530417545));
-        expectedMap.put("LongAvg", new Double(1.3252451333305637E13));
-        expectedMap.put("IntAvg", new Double(5.0));
-        expectedMap.put("FloatAvg", new Double(5.104904507723722));
+        expectedMap.put("AVG", new Double(5.5));
+        expectedMap.put("DoubleAvg", new Double(17.0567391834593));
+        expectedMap.put("LongAvg", new Double(14577696466636.2));
+        expectedMap.put("IntAvg", new Double(5.5));
+        expectedMap.put("FloatAvg", new Double(5.615394958853722));
         
         expectedMap.put("MIN", new Double(1));
         expectedMap.put("IntMin", new Integer(1));
@@ -145,7 +145,7 @@
         expectedMap.put("DoubleMax", new Double(121.0));
         expectedMap.put("StringMax", "unit");
         
-        expectedMap.put("COUNT", new Long(11));
+        expectedMap.put("COUNT", new Long(10));
 
         // set up allowedInput
         for (String[] aggGroups : aggs) {
@@ -726,7 +726,10 @@
             for (Tuple t: bg) {
                 Tuple newTuple = tupleFactory.newTuple(2);
                 newTuple.set(0, t.get(0));
-                newTuple.set(1, new Long(1));
+                if ( t.get(0) == null)
+                    newTuple.set(1, new Long(0));
+                else
+                    newTuple.set(1, new Long(1));
                 if(i < 5) {
                     bg1.add(newTuple);
                 } else {
@@ -752,11 +755,10 @@
         }    
     }
 
-
     @Test
     public void testCOUNT() throws Exception {
-        int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
-        long expected = input.length;
+        Integer input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, null };
+        long expected = input.length - 1;
 
         EvalFunc<Long> count = new COUNT();
         Tuple tup = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), input);
@@ -767,12 +769,12 @@
 
     @Test
     public void testCOUNTIntermed() throws Exception {
-        int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+        Integer input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
         
         DataBag intermediateInputBag = bagFactory.newDefaultBag();
         // call initial and then Intermed
-        for (int i : input) {
-            Tuple t = tupleFactory.newTuple(new Integer(i));
+        for (Integer i : input) {
+            Tuple t = tupleFactory.newTuple(i);
             DataBag b = bagFactory.newDefaultBag();
             b.add(t);
             Tuple initialInput = tupleFactory.newTuple(b);
@@ -800,6 +802,52 @@
     }
 
     @Test
+    public void testCOUNT_STAR() throws Exception {
+        Integer input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, null };
+        long expected = input.length;
+
+        EvalFunc<Long> count = new COUNT_STAR();
+        Tuple tup = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), input);
+        Long output = count.exec(tup);
+
+        assertTrue(output == expected);
+    }
+
+    @Test
+    public void testCOUNT_STARIntermed() throws Exception {
+        Integer input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+        
+        DataBag intermediateInputBag = bagFactory.newDefaultBag();
+        // call initial and then Intermed
+        for (Integer i : input) {
+            Tuple t = tupleFactory.newTuple(i);
+            DataBag b = bagFactory.newDefaultBag();
+            b.add(t);
+            Tuple initialInput = tupleFactory.newTuple(b);
+            EvalFunc<?> initial = new COUNT_STAR.Initial();
+            intermediateInputBag.add((Tuple)initial.exec(initialInput));
+        }
+
+        EvalFunc<Tuple> countIntermed = new COUNT_STAR.Intermediate();
+        Tuple intermediateInput = tupleFactory.newTuple(intermediateInputBag);
+        Tuple output = countIntermed.exec(intermediateInput);
+
+        Long f1 = DataType.toLong(output.get(0));
+        assertEquals("Expected count to be 10", 10, f1.longValue());
+    }
+
+    @Test
+    public void testCOUNT_STARFinal() throws Exception {
+        long input[] = { 23, 38, 39 };
+        Tuple tup = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), input);
+
+        EvalFunc<Long> count = new COUNT_STAR.Final();
+        Long output = count.exec(tup);
+
+        assertEquals("Expected count to be 100", 100, output.longValue());
+    }
+
+    @Test
     public void testSUM() throws Exception {
         String[] sumTypes = {"SUM", "DoubleSum", "LongSum", "IntSum", "FloatSum"};
         for(int k = 0; k < sumTypes.length; k++) {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLocal.java?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLocal.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLocal.java Fri Aug 14 22:53:53 2009
@@ -96,16 +96,18 @@
         int LOOP_COUNT = 4*1024;
         File tmpFile = File.createTempFile( this.getName(), ".txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        long nonNullCnt = 0;
         for(int i = 0; i < LOOP_COUNT; i++) {
         if ( i % 10 == 0 ){
                ps.println("");
         } else {
                ps.println(i);
+               nonNullCnt ++;
         }
         }
         ps.close();
 
-        assertEquals( new Double( LOOP_COUNT ), bigGroupAll( tmpFile) );
+        assertEquals( new Double( nonNullCnt ), bigGroupAll( tmpFile) );
 
         tmpFile.delete();
 

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java?rev=804406&r1=804405&r2=804406&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMapReduce.java Fri Aug 14 22:53:53 2009
@@ -95,16 +95,18 @@
         int LOOP_COUNT = 4*1024;
         File tmpFile = File.createTempFile( this.getName(), ".txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        long nonNullCnt = 0;
         for(int i = 0; i < LOOP_COUNT; i++) {
 	    if ( i % 10 == 0 ){
                ps.println("");
 	    } else {
                ps.println(i);
+               nonNullCnt ++;
 	    }
         }
         ps.close();
 
-        assertEquals( new Double( LOOP_COUNT ), bigGroupAll( tmpFile) );
+        assertEquals( new Double( nonNullCnt ), bigGroupAll( tmpFile) );
 
         tmpFile.delete();