You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/25 00:55:24 UTC

svn commit: r707776 - in /incubator/pig/branches/types: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/physi...

Author: olga
Date: Fri Oct 24 15:55:24 2008
New Revision: 707776

URL: http://svn.apache.org/viewvc?rev=707776&view=rev
Log:
PIG-508: problems with double joins

Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=707776&r1=707775&r2=707776&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Fri Oct 24 15:55:24 2008
@@ -301,3 +301,5 @@
     PIG-499: parser issue with as (sms via olgan)
 
     PIG-507: permission error not reported (pradeepk via olgan)
+
+    PIG-508: problem with double joins (pradeepk via olgan)

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=707776&r1=707775&r2=707776&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Oct 24 15:55:24 2008
@@ -809,7 +809,8 @@
             
             POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
             pkg.setKeyType(DataType.TUPLE);
-            pkg.setNumInps(0);
+            pkg.setDistinct(true);
+            pkg.setNumInps(1);
             boolean[] inner = {false}; 
             pkg.setInner(inner);
             curMROp.reducePlan.add(pkg);

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=707776&r1=707775&r2=707776&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Fri Oct 24 15:55:24 2008
@@ -58,7 +58,6 @@
         // OR in the reduce plan. POPostCombinerPackage could
         // be present only in the reduce plan. Search in these two
         // plans accordingly
-        
         if(!mr.combinePlan.isEmpty()) {
             PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.combinePlan);
             pkgDiscoverer.visit();
@@ -76,7 +75,7 @@
                 // if the POPackage is actually a POPostCombinerPackage, then we should
                 // just look for the corresponding LocalRearrange(s) in the combine plan
                 if(pkg instanceof POPostCombinerPackage) {
-                    if(!patchPackage(mr.combinePlan, pkg)) {
+                    if(patchPackage(mr.combinePlan, pkg) != pkg.getNumInps()) {
                         throw new VisitorException("Unexpected problem while trying " +
                         		"to optimize (could not find LORearrange in combine plan)");
                     }
@@ -91,26 +90,31 @@
     private void handlePackage(MapReduceOper mr, POPackage pkg) throws VisitorException {
         // the LocalRearrange(s) could either be in the map of this MapReduceOper
         // OR in the reduce of predecessor MapReduceOpers
-        if(!patchPackage(mr.mapPlan, pkg)) {
+        int lrFound = 0;
+        
+        lrFound = patchPackage(mr.mapPlan, pkg);
+        if(lrFound != pkg.getNumInps()) {
             // we did not find the LocalRearrange(s) in the map plan
             // let's look in the predecessors
             List<MapReduceOper> preds = this.mPlan.getPredecessors(mr);
             for (Iterator<MapReduceOper> it = preds.iterator(); it.hasNext();) {
                 MapReduceOper mrOper = it.next();
-                if(!patchPackage(mrOper.reducePlan, pkg)) {
-                    throw new VisitorException("Unexpected problem while trying " +
-                            "to optimize (could not find LORearrange in predecessor's reduce plan)");
+                lrFound += patchPackage(mrOper.reducePlan, pkg);
+                if(lrFound == pkg.getNumInps()) {
+                    break;
                 }     
             }
         }
+        if(lrFound != pkg.getNumInps())
+            throw new VisitorException("Unexpected problem while trying to optimize (Could not find all LocalRearranges)");
     }
 
-    private boolean patchPackage(PhysicalPlan plan, POPackage pkg) throws VisitorException {
+    private int patchPackage(PhysicalPlan plan, POPackage pkg) throws VisitorException {
         LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(plan, pkg);
         lrDiscoverer.visit();
         // let our caller know if we managed to patch
         // the package
-        return lrDiscoverer.isLoRearrangeFound();
+        return lrDiscoverer.getLoRearrangeFound();
     }
     
     /**
@@ -161,7 +165,7 @@
      */
     class LoRearrangeDiscoverer extends PhyPlanVisitor {
         
-        private boolean loRearrangeFound = false;
+        private int loRearrangeFound = 0;
         private POPackage pkg;
         
         public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
@@ -174,14 +178,22 @@
          */
         @Override
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
-            loRearrangeFound = true;
+            loRearrangeFound++;
             Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
             // annotate the package with information from the LORearrange
             // update the keyInfo information if already present in the POPackage
             keyInfo = pkg.getKeyInfo();
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+            
+            if(keyInfo.get(lrearrange.getIndex()) != null) {
+                // something is wrong - we should not be getting key info 
+                // for the same index from two different Local Rearranges
+                throw new VisitorException("Unexpected problem while trying " +
+                                "to optimize (found same index:" + lrearrange.getIndex() + 
+                                " in multiple Local Rearrange operators");
                 
+            }
             keyInfo.put(new Integer(lrearrange.getIndex()), 
                 new Pair<Boolean, Map<Integer, Integer>>(
                         lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
@@ -192,7 +204,7 @@
         /**
          * @return the loRearrangeFound
          */
-        public boolean isLoRearrangeFound() {
+        public int getLoRearrangeFound() {
             return loRearrangeFound;
         }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=707776&r1=707775&r2=707776&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Fri Oct 24 15:55:24 2008
@@ -84,6 +84,10 @@
     //on a particular input
     boolean[] inner;
     
+    // flag to denote whether there is a distinct
+    // leading to this package
+    protected boolean distinct = false;
+    
     // A mapping of input index to key information got from LORearrange
     // for that index. The Key information is a pair of boolean, Map.
     // The boolean indicates whether there is a lone project(*) in the 
@@ -182,97 +186,36 @@
      */
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        //Create numInputs bags
-        DataBag[] dbs = null;
-        if (numInputs > 0) {
+        Tuple res;
+        if(distinct) {
+            // only set the key which has the whole
+            // tuple 
+            res = mTupleFactory.newTuple(1);
+            res.set(0, key);
+        } else {
+            //Create numInputs bags
+            DataBag[] dbs = null;
             dbs = new DataBag[numInputs];
             for (int i = 0; i < numInputs; i++) {
                 dbs[i] = mBagFactory.newDefaultBag();
             }
-        }
-        
-        //For each indexed tup in the inp, sort them
-        //into their corresponding bags based
-        //on the index
-        while (tupIter.hasNext()) {
-            NullableTuple ntup = tupIter.next();
-            // Need to make a copy of the value, as hadoop uses the same ntup
-            // to represent each value.
-            Tuple val = (Tuple)ntup.getValueAsPigType();
-            /*
-            Tuple copy = mTupleFactory.newTuple(val.size());
-            for (int i = 0; i < val.size(); i++) {
-                copy.set(i, val.get(i));
-            }
-            */
             
-            Tuple copy = null;
-            // The "value (val)" that we just got may not
-            // be the complete "value". It may have some portions
-            // in the "key" (look in POLocalRearrange for more comments)
-            // If this is the case we need to stitch
-            // the "value" together.
-            int index = ntup.getIndex();
-            Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
-                keyInfo.get(index);
-            boolean isProjectStar = lrKeyInfo.first;
-            Map<Integer, Integer> keyLookup = lrKeyInfo.second;
-            int keyLookupSize = keyLookup.size();
-
-            if( keyLookupSize > 0) {
-            
-                // we have some fields of the "value" in the
-                // "key".
-                copy = mTupleFactory.newTuple();
-                int finalValueSize = keyLookupSize + val.size();
-                int valIndex = 0; // an index for accessing elements from 
-                                  // the value (val) that we have currently
-                for(int i = 0; i < finalValueSize; i++) {
-                    Integer keyIndex = keyLookup.get(i);
-                    if(keyIndex == null) {
-                        // the field for this index is not in the
-                        // key - so just take it from the "value"
-                        // we were handed
-                        copy.append(val.get(valIndex));
-                        valIndex++;
-                    } else {
-                        // the field for this index is in the key
-                        if(isKeyTuple) {
-                            // the key is a tuple, extract the
-                            // field out of the tuple
-                            copy.append(keyAsTuple.get(keyIndex));
-                        } else {
-                            copy.append(key);
-                        }
-                    }
-                }
-                
-            } else if (isProjectStar) {
-                
-                log.info("In project star, keyAsTuple:" + keyAsTuple);
-                // the whole "value" is present in the "key"
-                copy = mTupleFactory.newTuple(keyAsTuple.getAll());
-                
-            } else {
-                
-                // there is no field of the "value" in the
-                // "key" - so just make a copy of what we got
-                // as the "value"
-                copy = mTupleFactory.newTuple(val.getAll());
-                
+            //For each indexed tup in the inp, sort them
+            //into their corresponding bags based
+            //on the index
+            while (tupIter.hasNext()) {
+                NullableTuple ntup = tupIter.next();
+                int index = ntup.getIndex();
+                Tuple copy = getValueTuple(ntup, index);            
+                dbs[index].add(copy);
+                if(reporter!=null) reporter.progress();
             }
             
-            if (numInputs > 0) dbs[index].add(copy);
-            if(reporter!=null) reporter.progress();
-        }
-        
-        //Construct the output tuple by appending
-        //the key and all the above constructed bags
-        //and return it.
-        Tuple res;
-        res = mTupleFactory.newTuple(numInputs+1);
-        res.set(0,key);
-        if (numInputs > 0) {
+            //Construct the output tuple by appending
+            //the key and all the above constructed bags
+            //and return it.
+            res = mTupleFactory.newTuple(numInputs+1);
+            res.set(0,key);
             int i=-1;
             for (DataBag bag : dbs) {
                 if(inner[++i]){
@@ -293,6 +236,73 @@
         return r;
     }
 
+    protected Tuple getValueTuple(NullableTuple ntup, int index) throws ExecException {
+     // Need to make a copy of the value, as hadoop uses the same ntup
+        // to represent each value.
+        Tuple val = (Tuple)ntup.getValueAsPigType();
+        /*
+        Tuple copy = mTupleFactory.newTuple(val.size());
+        for (int i = 0; i < val.size(); i++) {
+            copy.set(i, val.get(i));
+        }
+        */
+        
+        Tuple copy = null;
+        // The "value (val)" that we just got may not
+        // be the complete "value". It may have some portions
+        // in the "key" (look in POLocalRearrange for more comments)
+        // If this is the case we need to stitch
+        // the "value" together.
+        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+            keyInfo.get(index);
+        boolean isProjectStar = lrKeyInfo.first;
+        Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+        int keyLookupSize = keyLookup.size();
+
+        if( keyLookupSize > 0) {
+        
+            // we have some fields of the "value" in the
+            // "key".
+            copy = mTupleFactory.newTuple();
+            int finalValueSize = keyLookupSize + val.size();
+            int valIndex = 0; // an index for accessing elements from 
+                              // the value (val) that we have currently
+            for(int i = 0; i < finalValueSize; i++) {
+                Integer keyIndex = keyLookup.get(i);
+                if(keyIndex == null) {
+                    // the field for this index is not in the
+                    // key - so just take it from the "value"
+                    // we were handed
+                    copy.append(val.get(valIndex));
+                    valIndex++;
+                } else {
+                    // the field for this index is in the key
+                    if(isKeyTuple) {
+                        // the key is a tuple, extract the
+                        // field out of the tuple
+                        copy.append(keyAsTuple.get(keyIndex));
+                    } else {
+                        copy.append(key);
+                    }
+                }
+            }
+            
+        } else if (isProjectStar) {
+            
+            // the whole "value" is present in the "key"
+            copy = mTupleFactory.newTuple(keyAsTuple.getAll());
+            
+        } else {
+            
+            // there is no field of the "value" in the
+            // "key" - so just make a copy of what we got
+            // as the "value"
+            copy = mTupleFactory.newTuple(val.getAll());
+            
+        }
+        return copy;
+    }
+    
     public byte getKeyType() {
         return keyType;
     }
@@ -340,5 +350,19 @@
         return keyInfo;
     }
 
+    /**
+     * @return the distinct
+     */
+    public boolean isDistinct() {
+        return distinct;
+    }
+
+    /**
+     * @param distinct the distinct to set
+     */
+    public void setDistinct(boolean distinct) {
+        this.distinct = distinct;
+    }
+
 
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=707776&r1=707775&r2=707776&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Fri Oct 24 15:55:24 2008
@@ -49,6 +49,7 @@
 import org.apache.pig.impl.io.PigFile;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
 
 import junit.framework.TestCase;
 
@@ -659,4 +660,37 @@
         assertEquals(1, t.get(2));
         assertEquals(Integer.class, t.get(2).getClass());
     }
+    
+    @Test
+    public void testCogroupWithInputFromGroup() throws IOException, ExecException {
+        // Create input file with ascii data
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"pigtester\t10\t1.2", "pigtester\t15\t1.2", 
+                "pigtester2\t10\t1.2",
+                "pigtester3\t10\t1.2", "pigtester3\t20\t1.2", "pigtester3\t30\t1.2"});
+        
+        Map<String, Pair<Long, Long>> resultMap = new HashMap<String, Pair<Long, Long>>();
+        // we will in essence be doing a group on first column and getting
+        // SUM over second column and a count for the group - store
+        // the results for the three groups above so we can check the output
+        resultMap.put("pigtester", new Pair<Long, Long>(25L, 2L));
+        resultMap.put("pigtester2", new Pair<Long, Long>(10L, 1L));
+        resultMap.put("pigtester3", new Pair<Long, Long>(60L, 3L));
+        
+        pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+                "as (name:chararray, age:int, gpa:double);");
+        pigServer.registerQuery("b = group a by name;");
+        pigServer.registerQuery("c = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+        "as (name:chararray, age:int, gpa:double);");
+        pigServer.registerQuery("d = cogroup b by group, c by name;");
+        pigServer.registerQuery("e = foreach d generate flatten(group), SUM(c.age), COUNT(c.name);");
+        Iterator<Tuple> it = pigServer.openIterator("e");
+        for(int i = 0; i < resultMap.size(); i++) {
+            Tuple t = it.next();
+            assertEquals(true, resultMap.containsKey(t.get(0)));
+            Pair<Long, Long> output = resultMap.get(t.get(0)); 
+            assertEquals(output.first, t.get(1));
+            assertEquals(output.second, t.get(2));
+        }
+    }
 }