You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/08/14 01:47:35 UTC

svn commit: r685730 - in /incubator/pig/branches/types: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/

Author: olga
Date: Wed Aug 13 16:47:34 2008
New Revision: 685730

URL: http://svn.apache.org/viewvc?rev=685730&view=rev
Log:
PIG-311: corss is broken

Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=685730&r1=685729&r2=685730&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Wed Aug 13 16:47:34 2008
@@ -149,3 +149,5 @@
     PIG-367: convinience function for UDFs to name schema
 
     PIG-368: making JobConf available to Load/Store UDFs
+
+    PIG-311: cross is broken

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=685730&r1=685729&r2=685730&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Aug 13 16:47:34 2008
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,7 +33,10 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
@@ -41,6 +45,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
 import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.impl.builtin.GFCross;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.*;
@@ -503,6 +508,119 @@
     }
 
     @Override
+    protected void visit(LOCross cs) throws VisitorException {
+        String scope = cs.getOperatorKey().scope;
+        List<LogicalOperator> inputs = cs.getInputs();
+        
+        POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)), cs
+                .getRequestedParallelism());
+        POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), cs.getRequestedParallelism());
+
+        currentPlan.add(poGlobal);
+        currentPlan.add(poPackage);
+        
+        int count = 0;
+        
+        try {
+            currentPlan.connect(poGlobal, poPackage);
+            List<Boolean> flattenLst = Arrays.asList(true, true);
+            
+            for (LogicalOperator op : inputs) {
+                List<PhysicalOperator> pop = Arrays.asList(LogToPhyMap.get(op));
+                PhysicalPlan fep1 = new PhysicalPlan();
+                ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cs.getRequestedParallelism());
+                Tuple ce1val = TupleFactory.getInstance().newTuple(2);
+                ce1val.set(0,inputs.size());
+                ce1val.set(1,count);
+                ce1.setValue(ce1val);
+                ce1.setResultType(DataType.TUPLE);
+                
+                fep1.add(ce1);
+
+                POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cs.getRequestedParallelism(), Arrays.asList((PhysicalOperator)ce1), new FuncSpec(GFCross.class.getName()));
+                gfc.setResultType(DataType.BAG);
+                fep1.addAsLeaf(gfc);
+                
+                PhysicalPlan fep2 = new PhysicalPlan();
+                POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism());
+                feproj.setResultType(DataType.TUPLE);
+                feproj.setStar(true);
+                feproj.setOverloaded(false);
+                fep2.add(feproj);
+                List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
+                
+                POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), fePlans, flattenLst );
+                currentPlan.add(fe);
+                currentPlan.connect(LogToPhyMap.get(op), fe);
+                
+                POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+                        scope, nodeGen.getNextNodeId(scope)), cs
+                        .getRequestedParallelism());
+                List<PhysicalPlan> lrPlans = new ArrayList<PhysicalPlan>();
+                for(int i=0;i<inputs.size();i++){
+                    PhysicalPlan lrp1 = new PhysicalPlan();
+                    POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), i);
+                    lrproj1.setOverloaded(false);
+                    lrproj1.setResultType(DataType.INTEGER);
+                    lrp1.add(lrproj1);
+                    lrPlans.add(lrp1);
+                }
+                
+                physOp.setCross(true);
+                physOp.setIndex(count++);
+                physOp.setKeyType(DataType.TUPLE);
+                physOp.setPlans(lrPlans);
+                physOp.setResultType(DataType.TUPLE);
+                
+                currentPlan.add(physOp);
+                currentPlan.connect(fe, physOp);
+                currentPlan.connect(physOp, poGlobal);
+            }
+        } catch (PlanException e1) {
+            log.error("Invalid physical operators in the physical plan"
+                    + e1.getMessage());
+            throw new VisitorException(e1);
+        } catch (ExecException e) {
+            log.error("Unable to create the constant tuple because " + e.getMessage());
+            throw new VisitorException(e);
+        }
+        
+        poPackage.setKeyType(DataType.TUPLE);
+        poPackage.setResultType(DataType.TUPLE);
+        poPackage.setNumInps(count);
+        boolean inner[] = new boolean[count];
+        for (int i=0;i<count;i++) {
+            inner[i] = true;
+        }
+        poPackage.setInner(inner);
+        
+        List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
+        List<Boolean> flattenLst = new ArrayList<Boolean>();
+        for(int i=1;i<=count;i++){
+            PhysicalPlan fep1 = new PhysicalPlan();
+            POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), i);
+            feproj1.setResultType(DataType.BAG);
+            feproj1.setOverloaded(false);
+            fep1.add(feproj1);
+            fePlans.add(fep1);
+            flattenLst.add(true);
+        }
+        
+        POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cs.getRequestedParallelism(), fePlans, flattenLst );
+        currentPlan.add(fe);
+        try{
+            currentPlan.connect(poPackage, fe);
+        }catch (PlanException e1) {
+            log.error("Invalid physical operators in the physical plan"
+                    + e1.getMessage());
+            throw new VisitorException(e1);
+        }
+        LogToPhyMap.put(cs, fe);
+    }
+    
+    @Override
     public void visit(LOCogroup cg) throws VisitorException {
         boolean currentPhysicalPlan = false;
         String scope = cg.getOperatorKey().scope;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=685730&r1=685729&r2=685730&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Wed Aug 13 16:47:34 2008
@@ -64,6 +64,8 @@
     byte keyType;
 
     private boolean mIsDistinct = false;
+    
+    private boolean isCross = false;
 
     // A place holder IndexedTuple used in distinct case where we really don't
     // have any value to pass through.  But hadoop gets cranky if we pass a
@@ -228,6 +230,10 @@
             outPut.set(1, mFakeIndexedTuple);
             return outPut;
         } else {
+            if(isCross){
+                for(int i=0;i<plans.size();i++)
+                    value.getAll().remove(0);
+            }
             //Create the indexed tuple out of the value
             //that is remaining in the input tuple
             IndexedTuple it = new IndexedTuple(value, index);
@@ -284,5 +290,13 @@
         return clone;
     }
 
+    public boolean isCross() {
+        return isCross;
+    }
+
+    public void setCross(boolean isCross) {
+        this.isCross = isCross;
+    }
+
 
 }