You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/01/08 02:18:29 UTC

svn commit: r732581 [1/2] - in /hadoop/pig/branches/types: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/execu...

Author: gates
Date: Wed Jan  7 17:18:29 2009
New Revision: 732581

URL: http://svn.apache.org/viewvc?rev=732581&view=rev
Log:
PIG-554 Added fragment replicate map side join.


Added:
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestFRJoin.java
Modified:
    hadoop/pig/branches/types/CHANGES.txt
    hadoop/pig/branches/types/src/org/apache/pig/PigServer.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    hadoop/pig/branches/types/src/org/apache/pig/data/BagFactory.java
    hadoop/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java
    hadoop/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
    hadoop/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Wed Jan  7 17:18:29 2009
@@ -6,6 +6,8 @@
 
   NEW FEATURES
 
+    PIG-554 Added fragment replicate map side join (shravanmn via pkamath and gates)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/PigServer.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/PigServer.java Wed Jan  7 17:18:29 2009
@@ -369,7 +369,7 @@
         }
     }
 
-    public void dumpSchema(String alias) throws IOException{
+    public Schema dumpSchema(String alias) throws IOException{
         try {
             LogicalPlan lp = getPlanFromAlias(alias, "describe");
             try {
@@ -380,6 +380,7 @@
             Schema schema = lp.getLeaves().get(0).getSchema();
             if (schema != null) System.out.println(alias + ": " + schema.toString());    
             else System.out.println("Schema for " + alias + " unknown.");
+            return schema;
         } catch (FrontendException fe) {
             throw WrappedIOException.wrap(
                 "Unable to describe schema for alias " + alias, fe);

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Jan  7 17:18:29 2009
@@ -47,6 +47,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
@@ -778,7 +779,80 @@
         }
     }
     
-    
+    /**
+     * This is an operator which will have multiple inputs(= to number of join inputs)
+     * But it prunes off all inputs but the fragment input and creates separate MR jobs
+     * for each of the replicated inputs and uses these as the replicated files that
+     * are configured in the POFRJoin operator. It also sets that this is FRJoin job
+     * and some parametes associated with it.
+     */
+    @Override
+    public void visitFRJoin(POFRJoin op) throws VisitorException {
+        try{
+            FileSpec[] replFiles = new FileSpec[op.getInputs().size()];
+            for (int i=0; i<replFiles.length; i++) {
+                if(i==op.getFragment()) continue;
+                replFiles[i] = getTempFileSpec();
+            }
+            op.setReplFiles(replFiles);
+            
+            List<OperatorKey> opKeys = new ArrayList<OperatorKey>(op.getInputs().size());
+            for (PhysicalOperator pop : op.getInputs()) {
+                opKeys.add(pop.getOperatorKey());
+            }
+            int fragPlan = 0;
+            for(int i=0;i<compiledInputs.length;i++){
+                MapReduceOper mro = compiledInputs[i];
+                OperatorKey opKey = (!mro.isMapDone()) ?  mro.mapPlan.getLeaves().get(0).getOperatorKey()
+                                                       :  mro.reducePlan.getLeaves().get(0).getOperatorKey();
+                if(opKeys.indexOf(opKey)==op.getFragment()){
+                    curMROp = mro;
+                    fragPlan = i;
+                    continue;
+                }
+                POStore str = getStore();
+                str.setSFile(replFiles[opKeys.indexOf(opKey)]);
+                if (!mro.isMapDone()) {
+                    mro.mapPlan.addAsLeaf(str);
+                    mro.setMapDoneSingle(true);
+                } else if (mro.isMapDone() && !mro.isReduceDone()) {
+                    mro.reducePlan.addAsLeaf(str);
+                    mro.setReduceDone(true);
+                } else {
+                    log.error("Both map and reduce phases have been done. This is unexpected while compiling!");
+                    throw new PlanException("Both map and reduce phases have been done. This is unexpected while compiling!");
+                }
+            }
+            for(int i=0;i<compiledInputs.length;i++){
+                if(i==fragPlan) continue;
+                MRPlan.connect(compiledInputs[i], curMROp);
+            }
+            
+            if (!curMROp.isMapDone()) {
+                curMROp.mapPlan.addAsLeaf(op);
+            } else if (curMROp.isMapDone() && !curMROp.isReduceDone()) {
+                curMROp.reducePlan.addAsLeaf(op);
+            } else {
+                log.error("Both map and reduce phases have been done. This is unexpected while compiling!");
+                throw new PlanException("Both map and reduce phases have been done. This is unexpected while compiling!");
+            }
+            List<List<PhysicalPlan>> joinPlans = op.getJoinPlans();
+            if(joinPlans!=null)
+                for (List<PhysicalPlan> joinPlan : joinPlans) {
+                    if(joinPlan!=null)
+                        for (PhysicalPlan plan : joinPlan) {
+                            addUDFs(plan);
+                        }
+                }
+            curMROp.setFrjoin(true);
+            curMROp.setFragment(op.getFragment());
+            curMROp.setReplFiles(op.getReplFiles());
+        }catch(Exception e){
+            VisitorException pe = new VisitorException(e.getMessage());
+            pe.initCause(e);
+            throw pe;
+        }
+    }
 
     @Override
     public void visitDistinct(PODistinct op) throws VisitorException {

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Wed Jan  7 17:18:29 2009
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
@@ -97,6 +98,11 @@
 
     private String scope;
     
+    //Fragment Replicate Join State
+    boolean frjoin = false;
+    FileSpec[] replFiles = null;
+    int fragment = -1;
+    
     int requestedParallelism = -1;
     
     // Last POLimit value in this map reduce operator, needed by LimitAdjuster
@@ -293,4 +299,28 @@
     public void setStreamInReduce(boolean streamInReduce) {
         this.streamInReduce = streamInReduce;
     }
+    
+    public int getFragment() {
+        return fragment;
+    }
+
+    public void setFragment(int fragment) {
+        this.fragment = fragment;
+    }
+
+    public boolean isFrjoin() {
+        return frjoin;
+    }
+
+    public void setFrjoin(boolean frjoin) {
+        this.frjoin = frjoin;
+    }
+
+    public FileSpec[] getReplFiles() {
+        return replFiles;
+    }
+
+    public void setReplFiles(FileSpec[] replFiles) {
+        this.replFiles = replFiles;
+    }
 }

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Jan  7 17:18:29 2009
@@ -702,6 +702,71 @@
         poPackage.setInner(cg.getInner());
         LogToPhyMap.put(cg, poPackage);
     }
+    
+    
+    /**
+     * Create the inner plans used to configure the Local Rearrange operators(ppLists)
+     * Extract the keytypes and create the POFRJoin operator.
+     */
+    @Override
+    protected void visit(LOFRJoin frj) throws VisitorException {
+        String scope = frj.getOperatorKey().scope;
+        List<LogicalOperator> inputs = frj.getInputs();
+        List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
+        List<Byte> keyTypes = new ArrayList<Byte>();
+        
+        int fragment = findFrag(inputs,frj.getFragOp());
+        List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
+        for (LogicalOperator op : inputs) {
+            inp.add(LogToPhyMap.get(op));
+            List<LogicalPlan> plans = (List<LogicalPlan>) frj.getJoinColPlans()
+                    .get(op);
+            
+            List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+            currentPlans.push(currentPlan);
+            for (LogicalPlan lp : plans) {
+                currentPlan = new PhysicalPlan();
+                PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                        .spawnChildWalker(lp);
+                pushWalker(childWalker);
+                mCurrentWalker.walk(this);
+                exprPlans.add((PhysicalPlan) currentPlan);
+                popWalker();
+
+            }
+            currentPlan = currentPlans.pop();
+            ppLists.add(exprPlans);
+            
+            if (plans.size() > 1) {
+                keyTypes.add(DataType.TUPLE);
+            } else {
+                keyTypes.add(exprPlans.get(0).getLeaves().get(0).getResultType());
+            }
+        }
+        POFRJoin pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),frj.getRequestedParallelism(),
+                                    inp, ppLists, keyTypes, null, fragment);
+        pfrj.setResultType(DataType.TUPLE);
+        currentPlan.add(pfrj);
+        for (LogicalOperator op : inputs) {
+            try {
+                currentPlan.connect(LogToPhyMap.get(op), pfrj);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
+                throw new VisitorException(e);
+            }
+        }
+        LogToPhyMap.put(frj, pfrj);
+    }
+
+    private int findFrag(List<LogicalOperator> inputs, LogicalOperator fragOp) {
+        int i=-1;
+        for (LogicalOperator lop : inputs) {
+            if(fragOp.getOperatorKey().equals(lop.getOperatorKey()))
+                return ++i;
+        }
+        return -1;
+    }
 
     @Override
     public void visit(LOFilter filter) throws VisitorException {

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Wed Jan  7 17:18:29 2009
@@ -219,6 +219,10 @@
     public void visitLimit(POLimit lim) throws VisitorException{
         //do nothing
     }
+    
+    public void visitFRJoin(POFRJoin join) throws VisitorException {
+        //do nothing
+    }
 
     /**
      * @param stream

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Wed Jan  7 17:18:29 2009
@@ -150,6 +150,14 @@
         else if(node instanceof POForEach){
             sb.append(planString(((POForEach)node).getInputPlans()));
         }
+        else if(node instanceof POFRJoin){
+            POFRJoin frj = (POFRJoin)node;
+            List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
+            if(joinPlans!=null)
+                for (List<PhysicalPlan> list : joinPlans) {
+                    sb.append(planString(list));
+                }
+        }
         
         List<O> originalPredecessors = mPlan.getPredecessors(node);
         if (originalPredecessors == null)

Added: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=732581&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Wed Jan  7 17:18:29 2009
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+
+/**
+ * The operator models the join keys using the Local Rearrange operators which 
+ * are configured with the plan specified by the user. It also sets up
+ * one Hashtable per replicated input which maps the Key(k) stored as a Tuple
+ * to a DataBag which holds all the values in the input having the same key(k)
+ * The getNext() reads an input from its predecessor and separates them into
+ * key & value. It configures a foreach operator with the databags obtained from
+ * each Hashtable for the key and also with the value for the fragment input.
+ * It then returns tuples returned by this foreach operator.
+ */
+public class POFRJoin extends PhysicalOperator {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    private Log log = LogFactory.getLog(getClass());
+    //The number in the input list which denotes the fragmented input
+    private int fragment;
+    //There can be n inputs each being a List<PhysicalPlan>
+    //Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
+    private List<List<PhysicalPlan>> phyPlanLists;
+    //The key type for each Local Rearrange operator
+    private List<Byte> keyTypes;
+    //The Local Rearrange operators modeling the join key
+    private POLocalRearrange[] LRs;
+    //The set of files that represent the replicated inputs
+    private FileSpec[] replFiles;
+    //Used to configure the foreach operator
+    private ConstantExpression[] constExps;
+    //Used to produce the cross product of various bags
+    private POForEach fe;
+    //The array of Hashtables one per replicated input. replicates[fragment] = null
+    private Map<Tuple,List<Tuple>> replicates[];
+    //varaible which denotes whether we are returning tuples from the foreach operator
+    private boolean processingPlan;
+    //A dummy tuple
+    private Tuple dumTup = TupleFactory.getInstance().newTuple(1);
+    //An instance of tuple factory
+    private transient TupleFactory mTupleFactory;
+    private transient BagFactory mBagFactory;
+    private boolean setUp;
+    
+    public POFRJoin(OperatorKey k) throws PlanException {
+        this(k,-1,null, null, null, null, -1);
+    }
+
+    public POFRJoin(OperatorKey k, int rp) throws PlanException {
+        this(k, rp, null, null, null, null, -1);
+    }
+
+    public POFRJoin(OperatorKey k, List<PhysicalOperator> inp) throws PlanException {
+        this(k, -1, inp, null, null, null, -1);
+    }
+
+    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) throws PlanException {
+        this(k,rp,inp,null, null, null, -1);
+    }
+    
+    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, List<List<PhysicalPlan>> ppLists, List<Byte> keyTypes, FileSpec[] replFiles, int fragment){
+        super(k,rp,inp);
+        
+        phyPlanLists = ppLists;
+        this.fragment = fragment;
+        this.keyTypes = keyTypes;
+        this.replFiles = replFiles;
+        replicates = new Map[ppLists.size()];
+        LRs = new POLocalRearrange[ppLists.size()];
+        constExps = new ConstantExpression[ppLists.size()];
+        createJoinPlans(k);
+        processingPlan = false;
+        mTupleFactory = TupleFactory.getInstance();
+        mBagFactory = BagFactory.getInstance();
+    }
+    
+    public List<List<PhysicalPlan>> getJoinPlans(){
+        return phyPlanLists;
+    }
+    
+    private OperatorKey genKey(OperatorKey old){
+        return new OperatorKey(old.scope,NodeIdGenerator.getGenerator().getNextNodeId(old.scope));
+    }
+    
+    /**
+     * Configures the Local Rearrange operators & the foreach operator
+     * @param old
+     */
+    private void createJoinPlans(OperatorKey old){
+        List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
+        List<Boolean> flatList = new ArrayList<Boolean>();
+        
+        int i=-1;
+        for (List<PhysicalPlan> ppLst : phyPlanLists) {
+            ++i;
+            POLocalRearrange lr = new POLocalRearrange(genKey(old));
+            lr.setIndex(i);
+            lr.setResultType(DataType.TUPLE);
+            lr.setKeyType(keyTypes.get(i));
+            lr.setPlans(ppLst);
+            LRs[i]= lr;
+            ConstantExpression ce = new ConstantExpression(genKey(old));
+            ce.setResultType((i==fragment)?DataType.TUPLE:DataType.BAG);
+            constExps[i] = ce;
+            PhysicalPlan pp = new PhysicalPlan();
+            pp.add(ce);
+            fePlans.add(pp);
+            flatList.add(true);
+        }
+        fe = new POForEach(genKey(old),-1,fePlans,flatList);
+    }
+    
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitFRJoin(this);
+    }
+
+    @Override
+    public String name() {
+        return "FRJoin[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        // TODO Auto-generated method stub
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        Result res = null;
+        Result inp = null;
+        if(!setUp){
+            setUpHashMap();
+            setUp = true;
+        }
+        if(processingPlan){
+            //Return tuples from the for each operator
+            //Assumes that it is configured appropriately with
+            //the bags for the current key.
+            while(true) {
+                res = fe.getNext(dummyTuple);
+                
+                if(res.returnStatus==POStatus.STATUS_OK){
+                    return res;
+                }
+                if(res.returnStatus==POStatus.STATUS_EOP){
+                    processingPlan = false;
+                    break;
+                }
+                if(res.returnStatus==POStatus.STATUS_ERR) {
+                    return res;
+                }
+                if(res.returnStatus==POStatus.STATUS_NULL) {
+                    continue;
+                }
+            }
+        }
+        while (true) {
+            //Process the current input
+            inp = processInput();
+            if (inp.returnStatus == POStatus.STATUS_EOP
+                    || inp.returnStatus == POStatus.STATUS_ERR)
+                return inp;
+            if (inp.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            }
+            
+            //Separate Key & Value using the fragment's LR operator
+            POLocalRearrange lr = LRs[fragment];
+            lr.attachInput((Tuple)inp.result);
+            Result lrOut = lr.getNext(dummyTuple);
+            if(lrOut.returnStatus!=POStatus.STATUS_OK) {
+                log.error("LocalRearrange isn't configured right or is not working");
+                return new Result();
+            }
+            Tuple lrOutTuple = (Tuple) lrOut.result;
+            Tuple key = TupleFactory.getInstance().newTuple(1);
+            key.set(0,lrOutTuple.get(1));
+            Tuple value = getValueTuple(lr, lrOutTuple);
+            
+            //Configure the for each operator with the relevant bags
+            int i=-1;
+            boolean noMatch = false;
+            for (ConstantExpression ce : constExps) {
+                ++i;
+                if(i==fragment){
+                    ce.setValue(value);
+                    continue;
+                }
+                Map<Tuple, List<Tuple>> replicate = replicates[i];
+                if(!replicate.containsKey(key)){
+                    noMatch = true;
+                    break;
+                }
+                ce.setValue(mBagFactory.newDefaultBag(replicate.get(key)));
+            }
+            if(noMatch)
+                continue;
+            fe.attachInput(dumTup);
+            processingPlan = true;
+            
+            Result gn = getNext(dummyTuple);
+            return gn;
+        }
+    }
+
+    /**
+     * Builds the HashMaps by reading each replicated input from the DFS
+     * using a Load operator
+     * @throws ExecException
+     */
+    private void setUpHashMap() throws ExecException {
+        int i=-1;
+        long time1 = System.currentTimeMillis();
+        for (FileSpec replFile : replFiles) {
+            ++i;
+            if(i==fragment){
+                replicates[i] = null;
+                continue;
+            }
+
+            POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L), replFile, false);
+            PigContext pc = new PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
+            pc.connect();
+            ld.setPc(pc);
+            POLocalRearrange lr = LRs[i];
+            lr.setInputs(Arrays.asList((PhysicalOperator)ld));
+            Map<Tuple, List<Tuple>> replicate = new HashMap<Tuple, List<Tuple>>(1000);
+            log.debug("Completed setup. Trying to build replication hash table");
+            int cnt = 0;
+            for(Result res=lr.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(dummyTuple)){
+                ++cnt;
+                if(reporter!=null) reporter.progress();
+                Tuple tuple = (Tuple) res.result;
+                Tuple key = mTupleFactory.newTuple(1);
+                key.set(0,tuple.get(1));
+                Tuple value = getValueTuple(lr, tuple);
+                if(!replicate.containsKey(key))
+                    replicate.put(key, new ArrayList<Tuple>());
+                replicate.get(key).add(value);
+            }
+            replicates[i] = replicate;
+
+        }
+	long time2 = System.currentTimeMillis();
+        log.debug("Hash Table built. Time taken: " + (time2-time1));
+    }
+    
+    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException, ExecException{
+        is.defaultReadObject();
+        mTupleFactory = TupleFactory.getInstance();
+        mBagFactory = BagFactory.getInstance();
+//        setUpHashTable();
+    }
+    
+    /*
+     * Extracts the value tuple from the LR operator's output tuple
+     */
+    private Tuple getValueTuple(POLocalRearrange lr, Tuple tuple) throws ExecException {
+        Tuple val = (Tuple) tuple.get(2);
+        Tuple retTup = null;
+        boolean isProjectStar = lr.isProjectStar();
+        Map<Integer, Integer> keyLookup = lr.getProjectedColsMap();
+        int keyLookupSize = keyLookup.size();
+        Object key = tuple.get(1);
+        boolean isKeyTuple = lr.isKeyTuple();
+        Tuple keyAsTuple = isKeyTuple ? (Tuple)tuple.get(1) : null;
+        if( keyLookupSize > 0) {
+            
+            // we have some fields of the "value" in the
+            // "key".
+            retTup = mTupleFactory.newTuple();
+            int finalValueSize = keyLookupSize + val.size();
+            int valIndex = 0; // an index for accessing elements from 
+                              // the value (val) that we have currently
+            for(int i = 0; i < finalValueSize; i++) {
+                Integer keyIndex = keyLookup.get(i);
+                if(keyIndex == null) {
+                    // the field for this index is not in the
+                    // key - so just take it from the "value"
+                    // we were handed
+                    retTup.append(val.get(valIndex));
+                    valIndex++;
+                } else {
+                    // the field for this index is in the key
+                    if(isKeyTuple) {
+                        // the key is a tuple, extract the
+                        // field out of the tuple
+                        retTup.append(keyAsTuple.get(keyIndex));
+                    } else {
+                        retTup.append(key);
+                    }
+                }
+            }
+            
+        } else if (isProjectStar) {
+            
+            // the whole "value" is present in the "key"
+            retTup = mTupleFactory.newTuple(keyAsTuple.getAll());
+            
+        } else {
+            
+            // there is no field of the "value" in the
+            // "key" - so just make a copy of what we got
+            // as the "value"
+            retTup = mTupleFactory.newTuple(val.getAll());
+            
+        }
+        return retTup;
+    }
+
+    public int getFragment() {
+        return fragment;
+    }
+
+    public void setFragment(int fragment) {
+        this.fragment = fragment;
+    }
+
+    public FileSpec[] getReplFiles() {
+        return replFiles;
+    }
+
+    public void setReplFiles(FileSpec[] replFiles) {
+        this.replFiles = replFiles;
+    }
+}

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Wed Jan  7 17:18:29 2009
@@ -77,6 +77,7 @@
     
     public POLoad(OperatorKey k, int rp, FileSpec lFile,boolean splittable) {
         super(k, rp);
+        this.lFile = lFile;
         this.splittable = splittable;
     }
     

Modified: hadoop/pig/branches/types/src/org/apache/pig/data/BagFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/BagFactory.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/BagFactory.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/BagFactory.java Wed Jan  7 17:18:29 2009
@@ -22,6 +22,7 @@
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.Comparator;
+import java.util.List;
 
 import org.apache.pig.impl.util.SpillableMemoryManager;
 
@@ -84,6 +85,12 @@
     public abstract DataBag newDefaultBag();
 
     /**
+     * Get a default (unordered, not distinct) data bag from
+     * an existing list of tuples.
+     */
+    public abstract DataBag newDefaultBag(List<Tuple> listOfTuples);
+    
+    /**
      * Get a sorted data bag.
      * @param comp Comparator that controls how the data is sorted.
      * If null, default comparator will be used.

Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java Wed Jan  7 17:18:29 2009
@@ -18,6 +18,7 @@
 package org.apache.pig.data;
 
 import java.util.Comparator;
+import java.util.List;
 
 import org.apache.pig.impl.util.SpillableMemoryManager;
 
@@ -33,6 +34,18 @@
         registerBag(b);
         return b;
     }
+    
+    /**
+     * Get a default (unordered, not distinct) data bag from
+     * an existing list of tuples. Note that the bag does NOT
+     * copy the tuples but uses the provided list as its backing store.
+     * So it takes ownership of the list.
+     */
+    public DataBag newDefaultBag(List<Tuple> listOfTuples) {
+        DataBag b = new DefaultDataBag(listOfTuples);
+        registerBag(b);
+        return b;
+    }
 
     /**
      * Get a sorted data bag.

Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java Wed Jan  7 17:18:29 2009
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.io.FileNotFoundException;
 
 import org.apache.commons.logging.Log;
@@ -51,6 +52,16 @@
         mContents = new ArrayList<Tuple>();
     }
 
+    /**
+     * This constructor creates a bag out of an existing list
+     * of tuples by taking ownership of the list and NOT
+     * copying the contents of the list.
+     * @param listOfTuples List<Tuple> containing the tuples
+     */
+    public DefaultDataBag(List<Tuple> listOfTuples) {
+        mContents = listOfTuples;
+    }
+
     public boolean isSorted() {
         return false;
     }

Added: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java?rev=732581&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java Wed Jan  7 17:18:29 2009
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+
+/**
+ * This is the logical operator for the Fragment Replicate Join
+ * It holds the user specified information and is responsible for 
+ * the schema computation. This mimics the LOCogroup operator except
+ * the schema computation.
+ */
+public class LOFRJoin extends LogicalOperator {
+    private static final long serialVersionUID = 2L;
+    
+//    private boolean[] mIsInner;
+    private static Log log = LogFactory.getLog(LOFRJoin.class);
+    private MultiMap<LogicalOperator, LogicalPlan> mJoinColPlans;
+    private LogicalOperator fragOp;
+    
+    public LOFRJoin(
+            LogicalPlan plan,
+            OperatorKey k,
+            MultiMap<LogicalOperator, LogicalPlan> joinColPlans,
+            boolean[] isInner, LogicalOperator fragOp) {
+        super(plan, k);
+        mJoinColPlans = joinColPlans;
+//        mIsInner = isInner;
+        this.fragOp = fragOp;
+    }
+
+    @Override
+    /**
+     * Uses the schema from its input operators and dedups
+     * those fields that have the same alias and sets the
+     * schema for the join
+     */
+    public Schema getSchema() throws FrontendException {
+        List<LogicalOperator> inputs = mPlan.getPredecessors(this);
+        mType = DataType.BAG;//mType is from the super class
+        Hashtable<String, Integer> nonDuplicates = new Hashtable<String, Integer>();
+        if(!mIsSchemaComputed){
+            List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
+            int i=-1;
+            for (LogicalOperator op : inputs) {
+                try {
+                    Schema cSchema = op.getSchema();
+                    if(cSchema!=null){
+                        
+                        for (FieldSchema schema : cSchema.getFields()) {
+                            ++i;
+                            if(nonDuplicates.containsKey(schema.alias))
+                                {
+                                    if(nonDuplicates.get(schema.alias)!=-1) {
+                                        nonDuplicates.remove(schema.alias);
+                                        nonDuplicates.put(schema.alias, -1);
+                                    }
+                                }
+                            else
+                                nonDuplicates.put(schema.alias, i);
+                            FieldSchema newFS = new FieldSchema(op.getAlias()+"::"+schema.alias,schema.schema,schema.type);
+                            newFS.setParent(schema.canonicalName, op);
+                            fss.add(newFS);
+                        }
+                    }
+                    else
+                        fss.add(new FieldSchema(null,DataType.BYTEARRAY));
+                } catch (FrontendException ioe) {
+                    mIsSchemaComputed = false;
+                    mSchema = null;
+                    throw ioe;
+                }
+            }
+            mIsSchemaComputed = true;
+            for (Entry<String, Integer> ent : nonDuplicates.entrySet()) {
+                int ind = ent.getValue();
+                if(ind==-1) continue;
+                FieldSchema prevSch = fss.get(ind);
+                fss.set(ind, new FieldSchema(ent.getKey(),prevSch.schema,prevSch.type));
+            }
+            mSchema = new Schema(fss);
+        }
+        return mSchema;
+    }
+
+    public MultiMap<LogicalOperator, LogicalPlan> getJoinColPlans() {
+        return mJoinColPlans;
+    }
+    
+    public void switchJoinColPlanOp(LogicalOperator oldOp,
+            LogicalOperator newOp) {
+        Collection<LogicalPlan> innerPlans = mJoinColPlans.removeKey(oldOp) ;
+        mJoinColPlans.put(newOp, innerPlans);
+        if(fragOp.getOperatorKey().equals(oldOp.getOperatorKey()))
+            fragOp = newOp;
+    }
+    
+    public void unsetSchema() throws VisitorException{
+        for(LogicalOperator input: getInputs()) {
+            Collection<LogicalPlan> grpPlans = mJoinColPlans.get(input);
+            if(grpPlans!=null)
+                for(LogicalPlan plan : grpPlans) {
+                    SchemaRemover sr = new SchemaRemover(plan);
+                    sr.visit();
+                }
+        }
+        super.unsetSchema();
+    }
+    
+    public List<LogicalOperator> getInputs() {
+        return mPlan.getPredecessors(this);
+    }
+    
+    @Override
+    public void visit(LOVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    @Override
+    public String name() {
+        return "FRJoin " + mKey.scope + "-" + mKey.id;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    public LogicalOperator getFragOp() {
+        return fragOp;
+    }
+
+    public void setFragOp(LogicalOperator fragOp) {
+        this.fragOp = fragOp;
+    }
+    
+    public boolean isTupleJoinCol() {
+        List<LogicalOperator> inputs = mPlan.getPredecessors(this);
+        if (inputs == null || inputs.size() == 0) {
+            throw new AssertionError("join.isTuplejoinCol() can be called "
+                                     + "after it has an input only") ;
+        }
+		// NOTE: we depend on the number of inner plans to determine
+		// if the join col is a tuple. This could be an issue when there
+		// is only one inner plan with Project(*). For that case if the
+		// corresponding input to the Project had a schema then the front end 
+		// would translate the single Project(*) (through ProjectStarTranslator)
+		// to many individual Projects. So the number of inner plans would then 
+		// be > 1 BEFORE reaching here. For the Project(*) case when the corresponding
+		// input for the Project has no schema, treating it as an atomic col join
+		// does not cause any problems since no casts need to be inserted in that case
+		// anyway.
+        return mJoinColPlans.get(inputs.get(0)).size() > 1 ;
+    }
+    public byte getAtomicJoinColType() throws FrontendException {
+        if (isTupleJoinCol()) {
+            throw new FrontendException("getAtomicjoinByType is used only when"
+                                     + " dealing with atomic join col") ;
+        }
+
+        byte joinColType = DataType.BYTEARRAY ;
+        // merge all the inner plan outputs so we know what type
+        // our join column should be
+        for(int i=0;i < getInputs().size(); i++) {
+            LogicalOperator input = getInputs().get(i) ;
+            List<LogicalPlan> innerPlans
+                        = new ArrayList<LogicalPlan>(getJoinColPlans().get(input)) ;
+            if (innerPlans.size() != 1) {
+                throw new FrontendException("Each join input has to have "
+                                         + "the same number of inner plans") ;
+            }
+            byte innerType = innerPlans.get(0).getSingleLeafPlanOutputType() ;
+            joinColType = DataType.mergeType(joinColType, innerType) ;
+        }
+
+        return joinColType ;
+    }
+
+    public Schema getTupleJoinColSchema() throws FrontendException {
+        if (!isTupleJoinCol()) {
+            throw new FrontendException("getTupleJoinColSchema is used only when"
+                                     + " dealing with tuple join col") ;
+        }
+
+        // this fsList represents all the columns in join tuple
+        List<Schema.FieldSchema> fsList = new ArrayList<Schema.FieldSchema>() ;
+
+        int outputSchemaSize = getJoinColPlans().get(getInputs().get(0)).size() ;
+
+        // by default, they are all bytearray
+        // for type checking, we don't care about aliases
+        for(int i=0; i<outputSchemaSize; i++) {
+            fsList.add(new Schema.FieldSchema(null, DataType.BYTEARRAY)) ;
+        }
+
+        // merge all the inner plan outputs so we know what type
+        // our join column should be
+        for(int i=0;i < getInputs().size(); i++) {
+            LogicalOperator input = getInputs().get(i) ;
+            List<LogicalPlan> innerPlans
+                        = new ArrayList<LogicalPlan>(getJoinColPlans().get(input)) ;
+
+            boolean seenProjectStar = false;
+            for(int j=0;j < innerPlans.size(); j++) {
+                byte innerType = innerPlans.get(j).getSingleLeafPlanOutputType() ;
+                ExpressionOperator eOp = (ExpressionOperator)innerPlans.get(j).getSingleLeafPlanOutputOp();
+
+                if(eOp instanceof LOProject) {
+                    if(((LOProject)eOp).isStar()) {
+                        seenProjectStar = true;
+                    }
+                }
+                        
+                Schema.FieldSchema joinFs = fsList.get(j);
+                joinFs.type = DataType.mergeType(joinFs.type, innerType) ;
+                Schema.FieldSchema fs = eOp.getFieldSchema();
+                if(null != fs) {
+                    joinFs.setParent(eOp.getFieldSchema().canonicalName, eOp);
+                } else {
+                    joinFs.setParent(null, eOp);
+                }
+            }
+
+            if(seenProjectStar && innerPlans.size() > 1) {
+                throw new FrontendException("joining attributes can either be star (*) or a list of expressions, but not both.");
+                
+            }
+
+        }
+
+        return new Schema(fsList) ;
+    }
+    
+
+}

Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java Wed Jan  7 17:18:29 2009
@@ -146,6 +146,15 @@
                 }
             }
         }
+        else if(node instanceof LOFRJoin){
+            MultiMap<LogicalOperator, LogicalPlan> plans = ((LOFRJoin)node).getJoinColPlans();
+            for (LogicalOperator lo : plans.keySet()) {
+                // Visit the associated plans
+                for (LogicalPlan plan : plans.get(lo)) {
+                    sb.append(planString(plan));
+                }
+            }
+        }
         else if(node instanceof LOSort){
             sb.append(planString(((LOSort)node).getSortColPlans())); 
         }

Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java Wed Jan  7 17:18:29 2009
@@ -135,6 +135,24 @@
             }
         }
     }
+    
+    protected void visit(LOFRJoin frj) throws VisitorException {
+        // Visit each of the inputs of cogroup.
+        MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = frj.getJoinColPlans();
+        for(LogicalOperator op: frj.getInputs()) {
+            for(LogicalPlan lp: mapGByPlans.get(op)) {
+                if (null != lp) {
+                    // TODO FIX - How do we know this should be a
+                    // DependencyOrderWalker?  We should be replicating the
+                    // walker the current visitor is using.
+                    PlanWalker w = new DependencyOrderWalker(lp);
+                    pushWalker(w);
+                    w.walk(this);
+                    popWalker();
+                }
+            }
+        }
+    }
 
     /**
      * 

Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java Wed Jan  7 17:18:29 2009
@@ -57,6 +57,11 @@
         op.setPlan(mCurrentWalker.getPlan());
         super.visit(op);
     }
+    
+    public void visit(LOFRJoin op) throws VisitorException {
+        op.setPlan(mCurrentWalker.getPlan());
+        super.visit(op);
+    }
 
     public void visit(LOConst op) throws VisitorException {
         op.setPlan(mCurrentWalker.getPlan());

Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java Wed Jan  7 17:18:29 2009
@@ -74,6 +74,32 @@
             mapGByPlans.put(op, newGByPlans);
         }
     }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.logicalLayer.LOVisitor#visit(org.apache.pig.impl.logicalLayer.LOFRJoin)
+     */
+    @Override
+    protected void visit(LOFRJoin frj) throws VisitorException {
+        //get the attributes of LOFRJoin that are modified during the translation
+        
+        MultiMap<LogicalOperator, LogicalPlan> joinColPlans = frj.getJoinColPlans();
+
+        for(LogicalOperator op: frj.getInputs()) {
+            ArrayList<LogicalPlan> newPlansAfterTranslation = new ArrayList<LogicalPlan>();
+            for(LogicalPlan lp: joinColPlans.get(op)) {
+                if (checkPlanForProjectStar(lp)) {
+                    ArrayList<LogicalPlan> translatedPlans = translateProjectStarInPlan(lp);
+                    for(int j = 0; j < translatedPlans.size(); ++j) {
+                        newPlansAfterTranslation.add(translatedPlans.get(j));
+                    }
+                } else {
+                    newPlansAfterTranslation.add(lp);
+                }
+            }
+            joinColPlans.removeKey(op);
+            joinColPlans.put(op, newPlansAfterTranslation);
+        }
+    }
 
     /**
      * 

Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java Wed Jan  7 17:18:29 2009
@@ -60,6 +60,7 @@
         NodeIdGenerator idGen = NodeIdGenerator.getGenerator();
         LOSplit splitOp = new LOSplit(mPlan, new OperatorKey(scope, 
                 idGen.getNextNodeId(scope)), new ArrayList<LogicalOperator>());
+        splitOp.setAlias(nodes.get(0).getAlias());
         try {
             mPlan.add(splitOp);
             
@@ -136,6 +137,7 @@
                 splitOp.addOutput(splitOutput);
                 mPlan.add(splitOutput);
                 mPlan.insertBetween(splitOp, splitOutput, succ);
+                splitOutput.setAlias(splitOp.getAlias());
                 // Patch up the contained plans of succ
                 fixUpContainedPlans(nodes.get(0), splitOutput, succ, null);
             }

Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java Wed Jan  7 17:18:29 2009
@@ -30,6 +30,7 @@
 import org.apache.pig.impl.plan.optimizer.Transformer;
 import org.apache.pig.impl.plan.optimizer.Transformer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOFRJoin;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
@@ -182,6 +183,10 @@
             LOCogroup cg = (LOCogroup) before ;
             cg.switchGroupByPlanOp(after, newNode);
         }
+        if (before instanceof LOFRJoin) {
+            LOFRJoin frj = (LOFRJoin) before ;
+            frj.switchJoinColPlanOp(after, newNode);
+        }
 
         // Visit all the inner plans of before and change their projects to
         // connect to newNode instead of after.
@@ -189,7 +194,10 @@
         List<LogicalPlan> plans = new ArrayList<LogicalPlan>();
         if (before instanceof LOCogroup) {
             plans.addAll((((LOCogroup)before).getGroupByPlans()).values());
-        } else if (before instanceof LOSort) {
+        } else if (before instanceof LOFRJoin) {
+            plans.addAll((((LOFRJoin)before).getJoinColPlans()).values());
+        }
+        else if (before instanceof LOSort) {
             plans.addAll(((LOSort)before).getSortColPlans());
         } else if (before instanceof LOFilter) {
             plans.add(((LOFilter)before).getComparisonPlan());

Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java Wed Jan  7 17:18:29 2009
@@ -204,5 +204,11 @@
         project.unsetFieldSchema();
         super.visit(project);
     }
+
+    @Override
+    protected void visit(LOFRJoin frj) throws VisitorException {
+        frj.unsetSchema();
+        super.visit(frj);
+    }
     
 }

Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Wed Jan  7 17:18:29 2009
@@ -189,7 +189,7 @@
             // position that has a type other than byte array.
             LOForEach foreach = new LOForEach(mPlan,
                 OperatorKey.genOpKey(scope), genPlans, flattens);
-
+            foreach.setAlias(lo.getAlias());
             // Insert the foreach into the plan and patch up the plan.
             insertAfter(lo, foreach, null);
 

Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Wed Jan  7 17:18:29 2009
@@ -221,6 +221,59 @@
 		log.trace("Exiting parseCogroup");
 		return cogroup;
 	}
+	
+	/**
+	 * Mimicing parseCogroup as the parsing logic for FRJoin remains exactly the same.
+	 */
+	LogicalOperator parseFRJoin(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
+		
+		log.trace("Entering parseCogroup");
+		log.debug("LogicalPlan: " + lp);
+		
+		int n = gis.size();
+		log.debug("Number of cogroup inputs = " + n);
+		
+		ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
+		ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
+		MultiMap<LogicalOperator, LogicalPlan> groupByPlans = new MultiMap<LogicalOperator, LogicalPlan>();
+		//Map<LogicalOperator, LogicalPlan> groupByPlans = new HashMap<LogicalOperator, LogicalPlan>();
+		boolean[] isInner = new boolean[n];
+		
+		int arity = gis.get(0).plans.size();
+		
+		for (int i = 0; i < n ; i++){
+			
+			CogroupInput gi = gis.get(i);
+			los.add(gi.op);
+			ArrayList<LogicalPlan> planList = gi.plans;
+			plans.add(gi.plans);
+			int numGrpByOps = planList.size();
+			log.debug("Number of group by operators = " + numGrpByOps);
+
+			if(arity != numGrpByOps) {
+				throw new ParseException("The arity of the group by columns do not match.");
+			}
+			for(int j = 0; j < numGrpByOps; ++j) {
+			    groupByPlans.put(gi.op, planList.get(j));
+				for(LogicalOperator root: planList.get(j).getRoots()) {
+					log.debug("Cogroup input plan root: " + root);
+				}
+			}
+			isInner[i] = gi.isInner;
+		}
+		
+		LogicalOperator frj = new LOFRJoin(lp, new OperatorKey(scope, getNextId()), groupByPlans, isInner, gis.get(0).op);
+		lp.add(frj);
+		log.debug("Added operator " + frj.getClass().getName() + " object " + frj + " to the logical plan " + lp);
+		
+		for(LogicalOperator op: los) {
+			lp.connect(op, frj);
+			log.debug("Connected operator " + op.getClass().getName() + " to " + frj.getClass().getName() + " in the logical plan");
+		}
+
+		log.trace("Exiting parseFRJoin");
+		return frj;
+	}
 			
 	/**
 	 * The join operator is translated to foreach 
@@ -1598,11 +1651,14 @@
 	ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); 
 	log.trace("Entering JoinClause");
 	log.debug("LogicalPlan: " + lp);
+	LogicalOperator frj = null;
 }
 {
 	(gi = GroupItem(lp) { gis.add(gi); }
-	("," gi = GroupItem(lp) { gis.add(gi); })+)
-	{log.trace("Exiting JoinClause"); return rewriteJoin(gis, lp);}
+	("," gi = GroupItem(lp) { gis.add(gi); })+
+	// The addition of using replicated to indicate FRJoin
+	[<USING> ("\"replicated\"" | "\"repl\"") { frj = parseFRJoin(gis, lp); }] )
+	{log.trace("Exiting JoinClause"); return (frj==null) ? rewriteJoin(gis, lp) : frj;}
 	
 }
 

Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Wed Jan  7 17:18:29 2009
@@ -2092,6 +2092,9 @@
                 // We may have to compute the schema of the input again
                 // because we have just inserted
                 if (insertedOp != null) {
+                    if(insertedOp.getAlias()==null){
+                        insertedOp.setAlias(inputs.get(i).getAlias());
+                    }
                     try {
                         this.visit(insertedOp);
                     }
@@ -2328,6 +2331,135 @@
             throw vse ;
         }
     }
+    
+    /**
+     * Mimics the type checking of LOCogroup
+     */
+    protected void visit(LOFRJoin frj) throws VisitorException {
+        try {
+            frj.regenerateSchema();
+        } catch (FrontendException fe) {
+            String msg = "Cannot resolve COGroup output schema" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg, fe) ;
+            throw vse ;
+        }
+        MultiMap<LogicalOperator, LogicalPlan> joinColPlans
+                                                    = frj.getJoinColPlans() ;
+        List<LogicalOperator> inputs = frj.getInputs() ;
+        
+        // Type checking internal plans.
+        for(int i=0;i < inputs.size(); i++) {
+            LogicalOperator input = inputs.get(i) ;
+            List<LogicalPlan> innerPlans
+                        = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+
+            for(int j=0; j < innerPlans.size(); j++) {
+
+                LogicalPlan innerPlan = innerPlans.get(j) ;
+                
+                // Check that the inner plan has only 1 output port
+                if (!innerPlan.isSingleLeafPlan()) {
+                    String msg = "COGroup's inner plans can only"
+                                 + "have one output (leaf)" ;
+                    msgCollector.collect(msg, MessageType.Error) ;
+                    throw new VisitorException(msg) ;
+                }
+
+                checkInnerPlan(innerPlans.get(j)) ;
+            }
+        }
+        
+        try {
+
+            if (!frj.isTupleJoinCol()) {
+                // merge all the inner plan outputs so we know what type
+                // our group column should be
+
+                // TODO: Don't recompute schema here
+                //byte groupType = schema.getField(0).type ;
+                byte groupType = frj.getAtomicJoinColType() ;
+
+                // go through all inputs again to add cast if necessary
+                for(int i=0;i < inputs.size(); i++) {
+                    LogicalOperator input = inputs.get(i) ;
+                    List<LogicalPlan> innerPlans
+                                = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+                    // Checking innerPlan size already done above
+                    byte innerType = innerPlans.get(0).getSingleLeafPlanOutputType() ;
+                    if (innerType != groupType) {
+                        insertAtomicCastForFRJInnerPlan(innerPlans.get(0),
+                                                            frj,
+                                                            groupType) ;
+                    }
+                }
+            }
+            else {
+
+                // TODO: Don't recompute schema here
+                //Schema groupBySchema = schema.getField(0).schema ;
+                Schema groupBySchema = frj.getTupleJoinColSchema() ;
+
+                // go through all inputs again to add cast if necessary
+                for(int i=0;i < inputs.size(); i++) {
+                    LogicalOperator input = inputs.get(i) ;
+                    List<LogicalPlan> innerPlans
+                                = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+                    for(int j=0;j < innerPlans.size(); j++) {
+                        LogicalPlan innerPlan = innerPlans.get(j) ;
+                        byte innerType = innerPlan.getSingleLeafPlanOutputType() ;
+                        byte expectedType = DataType.BYTEARRAY ;
+
+                        if (!DataType.isAtomic(innerType) && (DataType.TUPLE != innerType)) {
+                            String msg = "Sorry, group by complex types"
+                                       + " will be supported soon" ;
+                            msgCollector.collect(msg, MessageType.Error) ;
+                            VisitorException vse = new VisitorException(msg) ;
+                            throw vse ;
+                        }
+
+                        try {
+                            expectedType = groupBySchema.getField(j).type ;
+                        }
+                        catch(ParseException pe) {
+                            String msg = "Cannot resolve COGroup output schema" ;
+                            msgCollector.collect(msg, MessageType.Error) ;
+                            VisitorException vse = new VisitorException(msg) ;
+                            vse.initCause(pe) ;
+                            throw vse ;
+                        }
+
+                        if (innerType != expectedType) {
+                            insertAtomicCastForFRJInnerPlan(innerPlan,
+                                                                frj,
+                                                                expectedType) ;
+                        }
+                    }
+                }
+            }
+        }
+        catch (FrontendException fe) {
+            String msg = "Cannot resolve COGroup output schema" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg, fe) ;
+            throw vse ;
+        }
+
+        // TODO: Don't recompute schema here. Remove all from here!
+        // Generate output schema based on the schema generated from
+        // COGroup itself
+
+        try {
+            Schema outputSchema = frj.regenerateSchema() ;
+        }
+        catch (FrontendException fe) {
+            String msg = "Cannot resolve COGroup output schema" ;
+            msgCollector.collect(msg, MessageType.Error) ;
+            VisitorException vse = new VisitorException(msg) ;
+            vse.initCause(fe) ;
+            throw vse ;
+        }
+    }
 
     /**
      * COGroup
@@ -2460,7 +2592,35 @@
             throw vse ;
         }
     }
+    
+    private void insertAtomicCastForFRJInnerPlan(LogicalPlan innerPlan,
+            LOFRJoin frj, byte toType) throws VisitorException {
+        if (!DataType.isUsableType(toType)) {
+            throw new AssertionError("Cannot cast to type "
+                    + DataType.findTypeName(toType));
+        }
 
+        List<LogicalOperator> leaves = innerPlan.getLeaves();
+        if (leaves.size() > 1) {
+            throw new AssertionError(
+                    "insertAtomicForCOGroupInnerPlan cannot be"
+                            + " used when there is more than 1 output port");
+        }
+        ExpressionOperator currentOutput = (ExpressionOperator) leaves.get(0);
+        collectCastWarning(frj, currentOutput.getType(), toType);
+        OperatorKey newKey = genNewOperatorKey(currentOutput);
+        LOCast cast = new LOCast(innerPlan, newKey, currentOutput, toType);
+        innerPlan.add(cast);
+        try {
+            innerPlan.connect(currentOutput, cast);
+        } catch (PlanException ioe) {
+            AssertionError err = new AssertionError(
+                    "Explicit casting insertion");
+            err.initCause(ioe);
+            throw err;
+        }
+        this.visit(cast);
+    }
 
     // This helps insert casting to atomic types in COGroup's inner plans
     // as a new leave of the plan

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java Wed Jan  7 17:18:29 2009
@@ -18,6 +18,7 @@
 package org.apache.pig.test;
 
 import java.util.Comparator;
+import java.util.List;
 
 import org.apache.pig.data.*;
 
@@ -25,6 +26,13 @@
 // default bag factory.
 public class NonDefaultBagFactory extends BagFactory {
     public DataBag newDefaultBag() { return null; }
+    /* (non-Javadoc)
+     * @see org.apache.pig.data.BagFactory#newDefaultBag(java.util.List)
+     */
+    @Override
+    public DataBag newDefaultBag(List<Tuple> listOfTuples) {
+        return null;
+    }
     public DataBag newSortedBag(Comparator<Tuple> comp) { return null; }
     public DataBag newDistinctBag() { return null; }