You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/04/04 21:31:46 UTC

svn commit: r1088749 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/newplan/logical/relational/ test/org/apac...

Author: daijy
Date: Mon Apr  4 19:31:46 2011
New Revision: 1088749

URL: http://svn.apache.org/viewvc?rev=1088749&view=rev
Log:
PIG-1866: Dereference a bag within a tuple does not work

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr  4 19:31:46 2011
@@ -130,6 +130,8 @@ PIG-1696: Performance: Use System.arrayc
 
 BUG FIXES
 
+PIG-1866: Dereference a bag within a tuple does not work (daijy)
+
 PIG-1984: Worng stats shown when there are multiple stores but same file names (rding)
 
 PIG-1893: Pig report input size -1 for empty input file (rding)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Apr  4 19:31:46 2011
@@ -2495,7 +2495,7 @@ public class MRCompiler extends PhyPlanV
         // from the foreach after the package
         POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         topPrj.setColumn(1);
-        topPrj.setResultType(DataType.TUPLE);
+        topPrj.setResultType(DataType.BAG);
         topPrj.setOverloaded(true);
         fe2Plan.add(topPrj);
         

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Mon Apr  4 19:31:46 2011
@@ -256,49 +256,59 @@ public class POProject extends Expressio
      * @throws ExecException 
      */
     protected Result consumeInputBag(Result input) throws ExecException {
-        DataBag inpBag = (DataBag) input.result;
-        Result retVal = new Result();
-        if(isInputAttached() || isStar()){
-            retVal.result = inpBag;
-            retVal.returnStatus = POStatus.STATUS_OK;
-            detachInput();
-            return retVal;
-        }
-        
-        DataBag outBag;
-        if(resultSingleTupleBag) {
-            // we have only one tuple in a bag - so create
-            // A SingleTupleBag for the result and fill it
-            // appropriately from the input bag
-            Tuple tuple = inpBag.iterator().next();
-            if(!isProjectToEnd){
-                ArrayList<Object> objList = new ArrayList<Object>(columns.size()); 
-                for (int col : columns) {
-                    addColumn(objList, tuple, col);
-                }
-                outBag = new SingleTupleBag( tupleFactory.newTupleNoCopy(objList) );
-            }else {
-                Tuple tmpTuple = getRangeTuple(tuple);
-                outBag = new SingleTupleBag(tmpTuple);
+        if (input.result instanceof DataBag) {
+            DataBag inpBag = (DataBag) input.result;
+            Result retVal = new Result();
+            if(isInputAttached() || isStar()){
+                retVal.result = inpBag;
+                retVal.returnStatus = POStatus.STATUS_OK;
+                detachInput();
+                return retVal;
             }
-        } else {
-            outBag = bagFactory.newDefaultBag();
-            for (Tuple tuple : inpBag) {
+            
+            DataBag outBag;
+            if(resultSingleTupleBag) {
+                // we have only one tuple in a bag - so create
+                // A SingleTupleBag for the result and fill it
+                // appropriately from the input bag
+                Tuple tuple = inpBag.iterator().next();
                 if(!isProjectToEnd){
                     ArrayList<Object> objList = new ArrayList<Object>(columns.size()); 
                     for (int col : columns) {
                         addColumn(objList, tuple, col);
                     }
-                    outBag.add( tupleFactory.newTupleNoCopy(objList) );
-                }else{
-                    Tuple outTuple = getRangeTuple(tuple);
-                    outBag.add(outTuple);
+                    outBag = new SingleTupleBag( tupleFactory.newTupleNoCopy(objList) );
+                }else {
+                    Tuple tmpTuple = getRangeTuple(tuple);
+                    outBag = new SingleTupleBag(tmpTuple);
+                }
+            } else {
+                outBag = bagFactory.newDefaultBag();
+                for (Tuple tuple : inpBag) {
+                    if(!isProjectToEnd){
+                        ArrayList<Object> objList = new ArrayList<Object>(columns.size()); 
+                        for (int col : columns) {
+                            addColumn(objList, tuple, col);
+                        }
+                        outBag.add( tupleFactory.newTupleNoCopy(objList) );
+                    }else{
+                        Tuple outTuple = getRangeTuple(tuple);
+                        outBag.add(outTuple);
+                    }
                 }
             }
+            retVal.result = outBag;
+            retVal.returnStatus = POStatus.STATUS_OK;
+            return retVal;
+        } else if (input.result instanceof Tuple) {
+            // if input is tuple, columns should only have one item
+            Result retVal = new Result();
+            retVal.result = ((Tuple)input.result).get(columns.get(0));
+            retVal.returnStatus = POStatus.STATUS_OK;
+            return retVal;
+        } else {
+            throw new ExecException("Cannot dereference a bag from " + input.result.getClass().getName(), 1129);
         }
-        retVal.result = outBag;
-        retVal.returnStatus = POStatus.STATUS_OK;
-        return retVal;
     }
 
     private Tuple getRangeTuple(Tuple tuple) throws ExecException {
@@ -548,8 +558,12 @@ public class POProject extends Expressio
         //Should be removed once the model is clear
         if(reporter!=null) reporter.progress();
         
-        if(!isInputAttached())
-            return inputs.get(0).getNext(dummyBag);
+        if(!isInputAttached()) {
+            if (inputs.get(0).getResultType()==DataType.BAG)
+                return inputs.get(0).getNext(dummyBag);
+            else
+                return inputs.get(0).getNext(dummyTuple);
+        }
         else{
             res.result = (DataBag)input.get(columns.get(0));
             res.returnStatus = POStatus.STATUS_OK;

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Mon Apr  4 19:31:46 2011
@@ -466,13 +466,7 @@ public class LogToPhyTranslationVisitor 
         LogicalSchema s = load.getSchema();
 
         if (load.sourceIsBag()) {
-            Operator succ = load.getPlan().getSuccessors(load).get(0);
-            if (succ instanceof LOGenerate) {
-                exprOp.setResultType(DataType.BAG);
-            }
-            else {
-                exprOp.setResultType(DataType.TUPLE);
-            }
+            exprOp.setResultType(DataType.BAG);
             exprOp.setOverloaded(true);
         }
         else {

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Apr  4 19:31:46 2011
@@ -1416,4 +1416,23 @@ public class TestEvalPipeline2 {
         // Shall not throw exception
         pigServer.explain("f", System.out);
     }
+    
+    // See PIG-1866
+    @Test
+    public void testProjBagInTuple() throws Exception{
+        String[] input = {
+                "(1,{(one),(two)})",
+        };
+        
+        Util.createInputFile(cluster, "table_testProjBagInTuple", input);
+
+        pigServer.registerQuery("a = load 'table_testProjBagInTuple' as (t : tuple(i: int, b1: bag { b_tuple : tuple ( b_str: chararray) }));");
+        pigServer.registerQuery("b = foreach a generate t.b1;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("b");
+        
+        Tuple t = iter.next();
+        Assert.assertTrue(t.toString().equals("({(one),(two)})"));
+        Assert.assertFalse(iter.hasNext());
+    }
 }

Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld (original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld Mon Apr  4 19:31:46 2011
@@ -67,7 +67,7 @@ MapReduce(1,GFCross) - -29:
             |           |   |   |
             |           |   |   Project[tuple][*] - -10
             |           |   |
-            |           |   |---Project[tuple][1] - -9
+            |           |   |---Project[bag][1] - -9
             |           |
             |           |---Package[tuple]{chararray} - -8
             |   Local Rearrange[tuple]{chararray}(false) - -7
@@ -88,4 +88,4 @@ MapReduce(1,GFCross) - -29:
                 |       |   |
                 |       |   Constant(true) - --6860857982727545176
                 |       |
-                |       |---Load(DummyFil:DummyLdr) - -737965411848544085
\ No newline at end of file
+                |       |---Load(DummyFil:DummyLdr) - -737965411848544085