You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/04/04 21:31:46 UTC
svn commit: r1088749 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/newplan/logical/relational/ test/org/apac...
Author: daijy
Date: Mon Apr 4 19:31:46 2011
New Revision: 1088749
URL: http://svn.apache.org/viewvc?rev=1088749&view=rev
Log:
PIG-1866: Dereference a bag within a tuple does not work
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 4 19:31:46 2011
@@ -130,6 +130,8 @@ PIG-1696: Performance: Use System.arrayc
BUG FIXES
+PIG-1866: Dereference a bag within a tuple does not work (daijy)
+
PIG-1984: Worng stats shown when there are multiple stores but same file names (rding)
PIG-1893: Pig report input size -1 for empty input file (rding)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Apr 4 19:31:46 2011
@@ -2495,7 +2495,7 @@ public class MRCompiler extends PhyPlanV
// from the foreach after the package
POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
topPrj.setColumn(1);
- topPrj.setResultType(DataType.TUPLE);
+ topPrj.setResultType(DataType.BAG);
topPrj.setOverloaded(true);
fe2Plan.add(topPrj);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Mon Apr 4 19:31:46 2011
@@ -256,49 +256,59 @@ public class POProject extends Expressio
* @throws ExecException
*/
protected Result consumeInputBag(Result input) throws ExecException {
- DataBag inpBag = (DataBag) input.result;
- Result retVal = new Result();
- if(isInputAttached() || isStar()){
- retVal.result = inpBag;
- retVal.returnStatus = POStatus.STATUS_OK;
- detachInput();
- return retVal;
- }
-
- DataBag outBag;
- if(resultSingleTupleBag) {
- // we have only one tuple in a bag - so create
- // A SingleTupleBag for the result and fill it
- // appropriately from the input bag
- Tuple tuple = inpBag.iterator().next();
- if(!isProjectToEnd){
- ArrayList<Object> objList = new ArrayList<Object>(columns.size());
- for (int col : columns) {
- addColumn(objList, tuple, col);
- }
- outBag = new SingleTupleBag( tupleFactory.newTupleNoCopy(objList) );
- }else {
- Tuple tmpTuple = getRangeTuple(tuple);
- outBag = new SingleTupleBag(tmpTuple);
+ if (input.result instanceof DataBag) {
+ DataBag inpBag = (DataBag) input.result;
+ Result retVal = new Result();
+ if(isInputAttached() || isStar()){
+ retVal.result = inpBag;
+ retVal.returnStatus = POStatus.STATUS_OK;
+ detachInput();
+ return retVal;
}
- } else {
- outBag = bagFactory.newDefaultBag();
- for (Tuple tuple : inpBag) {
+
+ DataBag outBag;
+ if(resultSingleTupleBag) {
+ // we have only one tuple in a bag - so create
+ // A SingleTupleBag for the result and fill it
+ // appropriately from the input bag
+ Tuple tuple = inpBag.iterator().next();
if(!isProjectToEnd){
ArrayList<Object> objList = new ArrayList<Object>(columns.size());
for (int col : columns) {
addColumn(objList, tuple, col);
}
- outBag.add( tupleFactory.newTupleNoCopy(objList) );
- }else{
- Tuple outTuple = getRangeTuple(tuple);
- outBag.add(outTuple);
+ outBag = new SingleTupleBag( tupleFactory.newTupleNoCopy(objList) );
+ }else {
+ Tuple tmpTuple = getRangeTuple(tuple);
+ outBag = new SingleTupleBag(tmpTuple);
+ }
+ } else {
+ outBag = bagFactory.newDefaultBag();
+ for (Tuple tuple : inpBag) {
+ if(!isProjectToEnd){
+ ArrayList<Object> objList = new ArrayList<Object>(columns.size());
+ for (int col : columns) {
+ addColumn(objList, tuple, col);
+ }
+ outBag.add( tupleFactory.newTupleNoCopy(objList) );
+ }else{
+ Tuple outTuple = getRangeTuple(tuple);
+ outBag.add(outTuple);
+ }
}
}
+ retVal.result = outBag;
+ retVal.returnStatus = POStatus.STATUS_OK;
+ return retVal;
+ } else if (input.result instanceof Tuple) {
+ // if input is tuple, columns should only have one item
+ Result retVal = new Result();
+ retVal.result = ((Tuple)input.result).get(columns.get(0));
+ retVal.returnStatus = POStatus.STATUS_OK;
+ return retVal;
+ } else {
+ throw new ExecException("Cannot dereference a bag from " + input.result.getClass().getName(), 1129);
}
- retVal.result = outBag;
- retVal.returnStatus = POStatus.STATUS_OK;
- return retVal;
}
private Tuple getRangeTuple(Tuple tuple) throws ExecException {
@@ -548,8 +558,12 @@ public class POProject extends Expressio
//Should be removed once the model is clear
if(reporter!=null) reporter.progress();
- if(!isInputAttached())
- return inputs.get(0).getNext(dummyBag);
+ if(!isInputAttached()) {
+ if (inputs.get(0).getResultType()==DataType.BAG)
+ return inputs.get(0).getNext(dummyBag);
+ else
+ return inputs.get(0).getNext(dummyTuple);
+ }
else{
res.result = (DataBag)input.get(columns.get(0));
res.returnStatus = POStatus.STATUS_OK;
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Mon Apr 4 19:31:46 2011
@@ -466,13 +466,7 @@ public class LogToPhyTranslationVisitor
LogicalSchema s = load.getSchema();
if (load.sourceIsBag()) {
- Operator succ = load.getPlan().getSuccessors(load).get(0);
- if (succ instanceof LOGenerate) {
- exprOp.setResultType(DataType.BAG);
- }
- else {
- exprOp.setResultType(DataType.TUPLE);
- }
+ exprOp.setResultType(DataType.BAG);
exprOp.setOverloaded(true);
}
else {
Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon Apr 4 19:31:46 2011
@@ -1416,4 +1416,23 @@ public class TestEvalPipeline2 {
// Shall not throw exception
pigServer.explain("f", System.out);
}
+
+ // See PIG-1866
+ @Test
+ public void testProjBagInTuple() throws Exception{
+ String[] input = {
+ "(1,{(one),(two)})",
+ };
+
+ Util.createInputFile(cluster, "table_testProjBagInTuple", input);
+
+ pigServer.registerQuery("a = load 'table_testProjBagInTuple' as (t : tuple(i: int, b1: bag { b_tuple : tuple ( b_str: chararray) }));");
+ pigServer.registerQuery("b = foreach a generate t.b1;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("b");
+
+ Tuple t = iter.next();
+ Assert.assertTrue(t.toString().equals("({(one),(two)})"));
+ Assert.assertFalse(iter.hasNext());
+ }
}
Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld?rev=1088749&r1=1088748&r2=1088749&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld (original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld Mon Apr 4 19:31:46 2011
@@ -67,7 +67,7 @@ MapReduce(1,GFCross) - -29:
| | | |
| | | Project[tuple][*] - -10
| | |
- | | |---Project[tuple][1] - -9
+ | | |---Project[bag][1] - -9
| |
| |---Package[tuple]{chararray} - -8
| Local Rearrange[tuple]{chararray}(false) - -7
@@ -88,4 +88,4 @@ MapReduce(1,GFCross) - -29:
| | |
| | Constant(true) - --6860857982727545176
| |
- | |---Load(DummyFil:DummyLdr) - -737965411848544085
\ No newline at end of file
+ | |---Load(DummyFil:DummyLdr) - -737965411848544085