You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/09/21 23:47:31 UTC

svn commit: r1173880 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java src/org/apache/pig/builtin/COR.java test/e2e/pig/tests/nightly.conf

Author: daijy
Date: Wed Sep 21 21:47:30 2011
New Revision: 1173880

URL: http://svn.apache.org/viewvc?rev=1173880&view=rev
Log:
PIG-2286: Using COR function in Piggybank results in ERROR 2018: Internal error. Unable to introduce the combiner for optimization

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    pig/trunk/src/org/apache/pig/builtin/COR.java
    pig/trunk/test/e2e/pig/tests/nightly.conf

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1173880&r1=1173879&r2=1173880&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Sep 21 21:47:30 2011
@@ -257,6 +257,8 @@ PIG-2221: Couldnt find documentation for
 
 BUG FIXES
 
+PIG-2286: Using COR function in Piggybank results in ERROR 2018: Internal error. Unable to introduce the combiner for optimization (daijy)
+
 PIG-2270: Put jython.jar in classpath (daijy)
 
 PIG-2274: remove pig deb package dependency on sun-java6-jre (gkesavan via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=1173880&r1=1173879&r2=1173880&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Wed Sep 21 21:47:30 2011
@@ -421,8 +421,16 @@ public class CombinerOptimizer extends M
                     }
                 }
 
-                algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(combineUdf, pplan));
-
+                // The algebraic udf can have more than one input. Add the udf only once
+                boolean exist = false;
+                for (Pair<PhysicalOperator, PhysicalPlan> pair : algebraicOps) {
+                    if (pair.first.equals(combineUdf)) {
+                        exist = true;
+                        break;
+                    }
+                }
+                if (!exist)
+                    algebraicOps.add(new Pair<PhysicalOperator, PhysicalPlan>(combineUdf, pplan));
             }
         }
 

Modified: pig/trunk/src/org/apache/pig/builtin/COR.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/COR.java?rev=1173880&r1=1173879&r2=1173880&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/COR.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/COR.java Wed Sep 21 21:47:30 2011
@@ -142,15 +142,15 @@ public class COR extends EvalFunc<DataBa
         public Tuple exec(Tuple input) throws IOException {
             if (input == null || input.size() == 0)
                 return null;
-            Tuple output = TupleFactory.getInstance().newTuple(input.size() * 2); 
+            Tuple output = TupleFactory.getInstance().newTuple(input.size()*(input.size()-1)); 
             try {
                 int k = -1;
                 for(int i=0;i<input.size();i++){
                     for(int j=i+1;j<input.size();j++){
                         DataBag first = (DataBag)input.get(i);
                         DataBag second = (DataBag)input.get(j);
-                        output.set(k++, computeAll(first, second));
-                        output.set(k++, (Long)first.size());
+                        output.set(++k, computeAll(first, second));
+                        output.set(++k, (Long)first.size());
                     }
                 }
             } catch(Exception t) {
@@ -222,7 +222,7 @@ public class COR extends EvalFunc<DataBa
                             result.set(1, "var"+j);
                         }
                         Tuple tup = (Tuple)combined.get(count);
-                        double tempCount = (Double)combined.get(count+1);
+                        double tempCount = (Long)combined.get(count+1);
                         double sum_x_y = (Double)tup.get(0);
                         double sum_x = (Double)tup.get(1);
                         double sum_y = (Double)tup.get(2);
@@ -252,20 +252,19 @@ public class COR extends EvalFunc<DataBa
     static protected Tuple combine(DataBag values) throws IOException {
         Tuple output = TupleFactory.getInstance().newTuple();
         Tuple tuple; // copy of DataBag values 
-        tuple =  TupleFactory.getInstance().newTuple(values.size());
-        int ct=0;
+        tuple =  TupleFactory.getInstance().newTuple();
 
         try{
-            for (Iterator<Tuple> it = values.iterator(); it.hasNext();ct++) {
+            for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
                 Tuple t = it.next();
-                tuple.set(ct, t);
+                tuple.append(t);
             }
         }catch(Exception e){}
 
         try{
             int size = ((Tuple)tuple.get(0)).size();
             for(int i=0;i<size;i=i+2){
-                double count = 0;
+                long count = 0;
                 double sum_x_y = 0.0;
                 double sum_x = 0.0;
                 double sum_y = 0.0;
@@ -274,7 +273,7 @@ public class COR extends EvalFunc<DataBa
                 for(int j=0;j<tuple.size();j++){
                     Tuple temp = (Tuple)tuple.get(j);
                     Tuple tem = (Tuple)temp.get(i);
-                    count += (Double)temp.get(i+1);
+                    count += (Long)temp.get(i+1);
                     sum_x_y += (Double)tem.get(0);
                     sum_x += (Double)tem.get(1);
                     sum_y += (Double)tem.get(2);

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1173880&r1=1173879&r2=1173880&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Wed Sep 21 21:47:30 2011
@@ -3631,6 +3631,37 @@ store E into ':OUTPATH:';\, 
                                                 store b into ':OUTPATH:';?,
                     }
                 ],
+            },{
+                'name' => 'BugFix',
+                'tests' => [
+                    {
+                        # PIG-2286
+                        'num' => 1,
+                        'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double, gpa:double);
+                                B = group A all;
+                                C = foreach B generate group, COR(A.age, A.gpa);
+                                store C into ':OUTPATH:';?,
+                        'verify_pig_script' => q?set pig.exec.nocombiner true
+                                A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double ,gpa:double);
+                                B = group A all;
+                                C = foreach B generate group, COR(A.age, A.gpa);
+                                store C into ':OUTPATH:';?,
+                    }, {
+                        # PIG-2286, with 3 inputs to COR
+                        'num' => 2,
+                        'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double ,gpa:double);
+                                B = foreach A generate age, gpa, gpa*gpa as gpa2;
+                                C = group B all;
+                                D = foreach C generate group, COR(B.age, B.gpa, B.gpa2);
+                                store D into ':OUTPATH:';?,
+                        'verify_pig_script' => q?set pig.exec.nocombiner true
+                                A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double ,gpa:double);
+                                B = foreach A generate age, gpa, gpa*gpa as gpa2;
+                                C = group B all;
+                                D = foreach C generate group, COR(B.age, B.gpa, B.gpa2);
+                                store D into ':OUTPATH:';?,
+                    }
+                ],
             },
         ],
     },