You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ha...@apache.org on 2010/07/09 19:48:24 UTC

svn commit: r962618 [1/2] - in /hadoop/pig/branches/branch-0.7: ./ src/org/apache/pig/backend/hadoop/datastorage/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ ...

Author: hashutosh
Date: Fri Jul  9 17:48:23 2010
New Revision: 962618

URL: http://svn.apache.org/viewvc?rev=962618&view=rev
Log:
PIG-1309: Map-side Cogroup

Added:
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
    hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMapSideCogroup.java
Modified:
    hadoop/pig/branches/branch-0.7/CHANGES.txt
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultTuple.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
    hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/utils/TestHelper.java

Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/CHANGES.txt Fri Jul  9 17:48:23 2010
@@ -71,6 +71,8 @@ manner (rding via pradeepkth)
 
 IMPROVEMENTS
 
+PIG-1309: Map-side Cogroup (hashutosh)
+
 PIG-1441: new test targets (olgan)
 
 PIG-1381: Need a way for Pig to take an alternative property file (daijy)

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Fri Jul  9 17:48:23 2010
@@ -25,6 +25,7 @@ import java.util.Properties;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 
 public class ConfigurationUtil {
 
@@ -62,4 +63,12 @@ public class ConfigurationUtil {
         }
         
     }
+    
+    public static Properties getLocalFSProperties() {
+        Configuration localConf = new Configuration(false);
+        localConf.addResource("core-default.xml");
+        Properties props = ConfigurationUtil.toProperties(localConf);
+        props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+        return props;
+    }
 }

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Jul  9 17:48:23 2010
@@ -59,6 +59,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -812,7 +813,7 @@ public class JobControlCompiler{
         // global sort set (because in that case it's the sampling job) or if
         // it's a limit after a sort. 
         boolean hasOrderBy = false;
-        if (mro.isGlobalSort() || mro.isLimitAfterSort()) {
+        if (mro.isGlobalSort() || mro.isLimitAfterSort() || mro.usingTypedComparator()) {
             hasOrderBy = true;
         } else {
             List<MapReduceOper> succs = plan.getSuccessors(mro);
@@ -1106,6 +1107,29 @@ public class JobControlCompiler{
                 throw new VisitorException(msg, e);
             }
          }
+         
+         @Override
+        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+                throws VisitorException {
+          
+             // XXX Hadoop currently doesn't support distributed cache in local mode.
+             // This line will be removed after the support is added
+             if (pigContext.getExecType() == ExecType.LOCAL) return;
+             
+             String indexFile = mergeCoGrp.getIndexFileName();
+             
+             if (indexFile == null) throw new VisitorException("No index file");
+             
+             try {
+                String symlink = addSingleFileToDistributedCache(pigContext,
+                        conf, indexFile, "indexfile_mergecogrp_");
+                mergeCoGrp.setIndexFileName(symlink);
+            } catch (IOException e) {
+                String msg = "Internal error. Distributed cache could not " +
+                        "be set up for merge cogrp index file";
+                throw new VisitorException(msg, e);
+            }
+        }
      }
     
 }

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Jul  9 17:48:23 2010
@@ -61,6 +61,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
@@ -766,7 +767,7 @@ public class MRCompiler extends PhyPlanV
         }
     }
     
-    public void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper sortMROp) throws PlanException, VisitorException
+    private void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper sortMROp) throws PlanException, VisitorException
     {
         POLocalRearrange slr = (POLocalRearrange)sortMROp.mapPlan.getLeaves().get(0);
         
@@ -795,7 +796,7 @@ public class MRCompiler extends PhyPlanV
         mro.reducePlan.addAsLeaf(getPlainForEachOP());
     }
     
-    public void simpleConnectMapToReduce(MapReduceOper mro) throws PlanException
+    private void simpleConnectMapToReduce(MapReduceOper mro) throws PlanException
     {
         PhysicalPlan ep = new PhysicalPlan();
         POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
@@ -830,7 +831,7 @@ public class MRCompiler extends PhyPlanV
         mro.reducePlan.addAsLeaf(getPlainForEachOP());
     }
     
-    public POForEach getPlainForEachOP()
+    private POForEach getPlainForEachOP()
     {
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
         List<Boolean> flat1 = new ArrayList<Boolean>();
@@ -925,12 +926,12 @@ public class MRCompiler extends PhyPlanV
             }
             
             POLoad loader = (POLoad)phyOp;
-            LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(loader.getLFile().getFuncSpec());
-            try {
-                if(!(loadFunc instanceof CollectableLoadFunc)){
-                    throw new MRCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.");
-                }
-                loadFunc.setUDFContextSignature(loader.getSignature());
+             Object loadFunc = PigContext.instantiateFuncFromSpec(loader.getLFile().getFuncSpec());
+              try {
+                 if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
+                      throw new MRCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.");
+                  }
+                 ((LoadFunc)loadFunc).setUDFContextSignature(loader.getSignature());
                 ((CollectableLoadFunc)loadFunc).ensureAllKeyInstancesInSameSplit();
             } catch (MRCompilerException e){
                 throw (e);
@@ -1082,6 +1083,184 @@ public class MRCompiler extends PhyPlanV
         }
     }
 
+    /** Leftmost relation is referred as base relation (this is the one fed into mappers.) 
+     *  First, close all MROpers except for first one (referred as baseMROPer)
+     *  Then, create a MROper which will do indexing job (idxMROper)
+     *  Connect idxMROper before the mappedMROper in the MRPlan.
+     */
+
+    @Override
+    public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws VisitorException {
+
+        if(compiledInputs.length < 2){
+            String errMsg = "Merge Cogroup work on two or more relations." +
+            		"To use map-side group-by on single relation, use 'collected' qualifier.";
+            throw new MRCompilerException(errMsg);
+        }
+            
+        List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>(compiledInputs.length-1);
+        List<String> fileSpecs = new ArrayList<String>(compiledInputs.length-1);
+        List<String> loaderSigns = new ArrayList<String>(compiledInputs.length-1);
+        
+        try{
+            // Iterate through all the MROpers, disconnect side MROPers from 
+            // MROPerPlan and collect all the information needed in different lists.
+            
+            for(int i=0 ; i < compiledInputs.length; i++){
+                
+                MapReduceOper mrOper = compiledInputs[i];
+                PhysicalPlan mapPlan = mrOper.mapPlan;
+                if(mapPlan.getRoots().size() != 1){
+                    int errCode = 2171;
+                    String errMsg = "Expected one but found more then one root physical operator in physical plan.";
+                    throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+                }
+
+                PhysicalOperator rootPOOp = mapPlan.getRoots().get(0);
+                if(! (rootPOOp instanceof POLoad)){
+                    int errCode = 2172;
+                    String errMsg = "Expected physical operator at root to be POLoad. Found : "+rootPOOp.getClass().getCanonicalName();
+                    throw new MRCompilerException(errMsg,errCode);
+                }
+                
+                POLoad sideLoader = (POLoad)rootPOOp;
+                FileSpec loadFileSpec = sideLoader.getLFile();
+                FuncSpec funcSpec = loadFileSpec.getFuncSpec();
+                Object loadfunc = PigContext.instantiateFuncFromSpec(funcSpec);
+                if(i == 0){
+                    
+                    if(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass())))
+                        throw new MRCompilerException("Base loader in Cogroup must implement CollectableLoadFunc.");
+                    
+                    ((LoadFunc)loadfunc).setUDFContextSignature(sideLoader.getSignature());
+                    ((CollectableLoadFunc)loadfunc).ensureAllKeyInstancesInSameSplit();
+                    continue;
+                }
+                if(!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass())))
+                    throw new MRCompilerException("Side loaders in cogroup must implement IndexableLoadFunc.");
+                
+                funcSpecs.add(funcSpec);
+                fileSpecs.add(loadFileSpec.getFileName());
+                loaderSigns.add(sideLoader.getSignature());
+                MRPlan.remove(mrOper);
+            }
+            
+            poCoGrp.setSideLoadFuncs(funcSpecs);
+            poCoGrp.setSideFileSpecs(fileSpecs);
+            poCoGrp.setLoaderSignatures(loaderSigns);
+            
+            // Use map-reduce operator of base relation for the cogroup operation.
+            MapReduceOper baseMROp = phyToMROpMap.get(poCoGrp.getInputs().get(0));
+            if(baseMROp.mapDone || !baseMROp.reducePlan.isEmpty())
+                throw new MRCompilerException("Currently merged cogroup is not supported after blocking operators.");
+            
+            // Create new map-reduce operator for indexing job and then configure it.
+            MapReduceOper indexerMROp = getMROp();
+            FileSpec idxFileSpec = getIndexingJob(indexerMROp, baseMROp, poCoGrp.getLRInnerPlansOf(0));
+            poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
+            poCoGrp.setIndexFileName(idxFileSpec.getFileName());
+            
+            baseMROp.mapPlan.addAsLeaf(poCoGrp);
+            MRPlan.add(indexerMROp);
+            MRPlan.connect(indexerMROp, baseMROp);
+
+            phyToMROpMap.put(poCoGrp,baseMROp);
+            // Going forward, new operators should be added in baseMRop. To make
+            // sure, reset curMROp.
+            curMROp = baseMROp;
+        }
+        catch (ExecException e){
+           throw new MRCompilerException(e.getDetailedMessage(),e.getErrorCode(),e.getErrorSource(),e);
+        }
+        catch (MRCompilerException mrce){
+            throw(mrce);
+        }
+        catch (CloneNotSupportedException e) {
+            throw new MRCompilerException(e);
+        }
+        catch(PlanException e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + poCoGrp.getClass().getCanonicalName();
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+        catch (IOException e){
+            int errCode = 3000;
+            String errMsg = "IOException caught while compiling POMergeCoGroup";
+            throw new MRCompilerException(errMsg, errCode,e);
+        }
+    }
+    
+    // Sets up the indexing job for map-side cogroups.
+    private FileSpec getIndexingJob(MapReduceOper indexerMROp, 
+            final MapReduceOper baseMROp, final List<PhysicalPlan> mapperLRInnerPlans)
+        throws MRCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException {
+        
+        // First replace loader with  MergeJoinIndexer.
+        PhysicalPlan baseMapPlan = baseMROp.mapPlan;
+        POLoad baseLoader = (POLoad)baseMapPlan.getRoots().get(0);                            
+        FileSpec origLoaderFileSpec = baseLoader.getLFile();
+        FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
+        Object loadFunc = PigContext.instantiateFuncFromSpec(funcSpec);
+        
+        if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
+            int errCode = 1104;
+            String errMsg = "Base relation of merge-coGroup must implement " +
+            "OrderedLoadFunc interface. The specified loader " 
+            + funcSpec + " doesn't implement it";
+            throw new MRCompilerException(errMsg,errCode);
+        }
+        
+        String[] indexerArgs = new String[6];
+        indexerArgs[0] = funcSpec.toString();
+        indexerArgs[1] = ObjectSerializer.serialize((Serializable)mapperLRInnerPlans);
+        indexerArgs[3] = baseLoader.getSignature();
+        indexerArgs[4] = baseLoader.getOperatorKey().scope;
+        indexerArgs[5] = Boolean.toString(false); // we care for nulls. 
+            
+        PhysicalPlan phyPlan;
+        if (baseMapPlan.getSuccessors(baseLoader) == null 
+                || baseMapPlan.getSuccessors(baseLoader).isEmpty()){
+         // Load-Load-Cogroup case.
+            phyPlan = null; 
+        }
+            
+        else{ // We got something. Yank it and set it as inner plan.
+            phyPlan = baseMapPlan.clone();
+            PhysicalOperator root = phyPlan.getRoots().get(0);
+            phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0));
+            phyPlan.remove(root);
+
+        }
+        indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
+
+        POLoad idxJobLoader = getLoad();
+        idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
+                new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
+        indexerMROp.mapPlan.add(idxJobLoader);
+        
+        // Loader of mro will return a tuple of form - 
+        // (key1, key2, .. , WritableComparable, splitIndex). See MergeJoinIndexer for details.
+        
+        // After getting an index entry in each mapper, send all of them to one 
+        // reducer where they will be sorted on the way by Hadoop.
+        simpleConnectMapToReduce(indexerMROp);
+        
+        indexerMROp.requestedParallelism = 1; // we need exactly one reducer for indexing job.
+        
+        // We want to use typed tuple comparator for this job, instead of default 
+        // raw binary comparator used by Pig, to make sure index entries are 
+        // sorted correctly by Hadoop.
+        indexerMROp.useTypedComparator(true); 
+
+        POStore st = getStore();
+        FileSpec strFile = getTempFileSpec();
+        st.setSFile(strFile);
+        indexerMROp.reducePlan.addAsLeaf(st);
+        indexerMROp.setReduceDone(true);
+
+        return strFile;
+    }
+    
     /** Since merge-join works on two inputs there are exactly two MROper predecessors identified  as left and right.
      *  Instead of merging two operators, both are used to generate a MR job each. First MR oper is run to generate on-the-fly index on right side.
      *  Second is used to actually do the join. First MR oper is identified as rightMROper and second as curMROper.
@@ -1194,7 +1373,7 @@ public class MRCompiler extends PhyPlanV
             }
             
             joinOp.setupRightPipeline(rightPipelinePlan);
-	    rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.        
+            rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.        
             
             // At this point, we must be operating on map plan of right input and it would contain nothing else other then a POLoad.
             POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);            
@@ -1236,8 +1415,9 @@ public class MRCompiler extends PhyPlanV
                     }
                 }
             } else {
+                
                 // Replace POLoad with  indexer.
-                String[] indexerArgs = new String[3];
+                String[] indexerArgs = new String[6];
                 FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
                 indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
                 if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0]) instanceof OrderedLoadFunc)){
@@ -1250,73 +1430,19 @@ public class MRCompiler extends PhyPlanV
                 List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
                 indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
                 indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
+                indexerArgs[3] = rightLoader.getSignature();
+                indexerArgs[4] = rightLoader.getOperatorKey().scope;
+                indexerArgs[5] = Boolean.toString(true);
+                
                 FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
                 rightLoader.setLFile(lFile);
     
                 // Loader of mro will return a tuple of form - 
                 // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
-                // Now set up a POLocalRearrange which has "all" as the key and tuple fetched
-                // by loader as the "value" of POLocalRearrange
-                // Sorting of index can possibly be achieved by using Hadoop sorting 
-                // between map and reduce instead of Pig doing sort. If that is so, 
-                // it will simplify lot of the code below.
-                
-                PhysicalPlan lrPP = new PhysicalPlan();
-                ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                ce.setValue("all");
-                ce.setResultType(DataType.CHARARRAY);
-                lrPP.add(ce);
-    
-                List<PhysicalPlan> lrInnerPlans = new ArrayList<PhysicalPlan>();
-                lrInnerPlans.add(lrPP);
-    
-                POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                lr.setIndex(0);
-                lr.setKeyType(DataType.CHARARRAY);
-                lr.setPlans(lrInnerPlans);
-                lr.setResultType(DataType.TUPLE);
-                rightMROpr.mapPlan.addAsLeaf(lr);
-    
-                rightMROpr.setMapDone(true);
-    
-                // On the reduce side of this indexing job, there will be a global rearrange followed by POSort.
-                // Output of POSort will be index file dumped on the DFS.
-    
-                // First add POPackage.
-                POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                pkg.setKeyType(DataType.CHARARRAY);
-                pkg.setNumInps(1); 
-                pkg.setInner(new boolean[]{false});
-                rightMROpr.reducePlan.add(pkg);
-    
-                // Next project tuples from the bag created by POPackage.
-                POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                topPrj.setColumn(1);
-                topPrj.setResultType(DataType.TUPLE);
-                topPrj.setOverloaded(true);
-                rightMROpr.reducePlan.add(topPrj);
-                rightMROpr.reducePlan.connect(pkg, topPrj);
-    
-                // Now create and add POSort. Sort plan is project *.
-                List<PhysicalPlan> sortPlans = new ArrayList<PhysicalPlan>(1);
-                PhysicalPlan innerSortPlan = new PhysicalPlan();
-                POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                prj.setStar(true);
-                prj.setOverloaded(false);
-                prj.setResultType(DataType.TUPLE);
-                innerSortPlan.add(prj);
-                sortPlans.add(innerSortPlan);
-    
-                // Currently we assume all columns are in asc order.
-                // Add two because filename and offset are added by Indexer in addition to keys.
-                List<Boolean>  mAscCols = new ArrayList<Boolean>(rightInpPlans.size()+2);
-                for(int i=0; i< rightInpPlans.size()+2; i++)
-                    mAscCols.add(true);
-    
-                POSort sortOp = new POSort(new OperatorKey(scope,nig.getNextNodeId(scope)),1, null, sortPlans, mAscCols, null);
-                rightMROpr.reducePlan.add(sortOp);
-                rightMROpr.reducePlan.connect(topPrj, sortOp);
-    
+
+                simpleConnectMapToReduce(rightMROpr);
+                rightMROpr.useTypedComparator(true);
+                
                 POStore st = getStore();
                 FileSpec strFile = getTempFileSpec();
                 st.setSFile(strFile);
@@ -1334,12 +1460,8 @@ public class MRCompiler extends PhyPlanV
                 joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());  
                 
                 joinOp.setIndexFile(strFile.getFileName());
-                 
             }
             
-   
-//            joinOp.setIndexFile(strFile);
-            
             // We are done with right side. Lets work on left now.
             // Join will be materialized in leftMROper.
             if(!curMROp.mapDone) // Life is easy 
@@ -1657,7 +1779,7 @@ public class MRCompiler extends PhyPlanV
         throw new PlanException(msg, errCode, PigException.BUG);
     }
     
-    public MapReduceOper getSortJob(
+    private MapReduceOper getSortJob(
             POSort sort,
             MapReduceOper quantJob,
             FileSpec lFile,
@@ -1818,7 +1940,7 @@ public class MRCompiler extends PhyPlanV
         return mro;
     }
 
-    public Pair<MapReduceOper,Integer> getQuantileJob(
+    private Pair<MapReduceOper,Integer> getQuantileJob(
             POSort inpSort,
             MapReduceOper prevJob,
             FileSpec lFile,
@@ -1854,7 +1976,7 @@ public class MRCompiler extends PhyPlanV
     /**
      * Create Sampling job for skewed join.
      */
-    public Pair<MapReduceOper, Integer> getSkewedJoinSampleJob(POSkewedJoin op, MapReduceOper prevJob, 
+    private Pair<MapReduceOper, Integer> getSkewedJoinSampleJob(POSkewedJoin op, MapReduceOper prevJob, 
     		FileSpec lFile, FileSpec sampleFile, int rp ) throws PlanException, VisitorException {
     	    	
     	MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
@@ -1932,7 +2054,7 @@ public class MRCompiler extends PhyPlanV
      * @throws VisitorException
      */
   	@SuppressWarnings("deprecation")
-    protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
+    private Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
   			FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans, 
   			String udfClassName, String[] udfArgs, String sampleLdrClassName ) throws PlanException, VisitorException {
   		

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Jul  9 17:48:23 2010
@@ -18,18 +18,12 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
 import java.util.Set;
 
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.Operator;
@@ -41,7 +35,7 @@ import org.apache.pig.impl.plan.VisitorE
  * Acts as a host to the plans that will
  * execute in map, reduce and optionally combine
  * phases. These will be embedded in the MROperPlan
- * in order to capture the dependecies amongst jobs.
+ * in order to capture the dependencies amongst jobs.
  */
 public class MapReduceOper extends Operator<MROpPlanVisitor> {
     private static final long serialVersionUID = 1L;
@@ -133,6 +127,12 @@ public class MapReduceOper extends Opera
     // Used by Skewed Join
 	private String skewedJoinPartitionFile;
 	
+	// Flag to communicate from MRCompiler to JobControlCompiler what kind of
+	// comparator is used by Hadoop for sorting for this MROper. 
+	// By default, set to false which will make Pig provide raw comparators. 
+	// Set to true in indexing job generated in map-side cogroup, merge join.
+	private boolean usingTypedComparator = false;
+	
     public MapReduceOper(OperatorKey k) {
         super(k);
         mapPlan = new PhysicalPlan();
@@ -383,4 +383,12 @@ public class MapReduceOper extends Opera
     public void setUseSecondaryKey(boolean useSecondaryKey) {
         this.useSecondaryKey = useSecondaryKey;
     }
+
+    protected boolean usingTypedComparator() {
+        return usingTypedComparator;
+    }
+
+    protected void useTypedComparator(boolean useTypedComparator) {
+        this.usingTypedComparator = useTypedComparator;
+    }
 }

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java Fri Jul  9 17:48:23 2010
@@ -58,6 +58,7 @@ public class MergeJoinIndexer  extends L
     private Tuple dummyTuple = null;
     private LoadFunc loader;
     private PigSplit pigSplit = null;
+    private boolean ignoreNullKeys;
     
     /** @param funcSpec : Loader specification.
      *  @param innerPlan : This is serialized version of LR plan. We 
@@ -67,12 +68,16 @@ public class MergeJoinIndexer  extends L
      * @throws ExecException 
      */
     @SuppressWarnings("unchecked")
-    public MergeJoinIndexer(String funcSpec, String innerPlan, String serializedPhyPlan) throws ExecException{
+    public MergeJoinIndexer(String funcSpec, String innerPlan, String serializedPhyPlan, 
+            String udfCntxtSignature, String scope, String ignoreNulls) throws ExecException{
         
         loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
+        loader.setUDFContextSignature(udfCntxtSignature);
+        this.ignoreNullKeys = Boolean.parseBoolean(ignoreNulls);
+        
         try {
             List<PhysicalPlan> innerPlans = (List<PhysicalPlan>)ObjectSerializer.deserialize(innerPlan);
-            lr = new POLocalRearrange(new OperatorKey("MergeJoin Indexer",NodeIdGenerator.getGenerator().getNextNodeId("MergeJoin Indexer")));
+            lr = new POLocalRearrange(new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope)));
             lr.setPlans(innerPlans);
             keysCnt = innerPlans.size();
             precedingPhyPlan = (PhysicalPlan)ObjectSerializer.deserialize(serializedPhyPlan);
@@ -121,7 +126,7 @@ public class MergeJoinIndexer  extends L
                 lr.attachInput(readTuple);
                 key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
                 lr.detachInput();
-                if ( null == key) // Tuple with null key. Drop it.
+                if ( null == key && ignoreNullKeys) // Tuple with null key. Drop it.
                     continue;
                 break;      
             }
@@ -141,7 +146,7 @@ public class MergeJoinIndexer  extends L
                     lr.attachInput((Tuple)res.result);
                     key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
                     lr.detachInput();
-                    if ( null == key) // Tuple with null key. Drop it.
+                    if ( null == key && ignoreNullKeys) // Tuple with null key. Drop it.
                         continue;
                      fetchNewTup = false;
                     break;

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Fri Jul  9 17:48:23 2010
@@ -296,4 +296,10 @@ public class PhyPlanSetter extends PhyPl
         preCombinerLocalRearrange.setParentPlan(parent);
     }
 
+
+    @Override
+    public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+            throws VisitorException {
+        mergeCoGrp.setParentPlan(parent);
+    }
 }

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Fri Jul  9 17:48:23 2010
@@ -185,7 +185,7 @@ public class PigSplit extends InputSplit
     // package level access because we don't want LoadFunc implementations
     // to get this information - this is to be used only from
     // MergeJoinIndexer
-    int getSplitIndex() {
+    public int getSplitIndex() {
         return splitIndex;
     }
 

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Fri Jul  9 17:48:23 2010
@@ -21,6 +21,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -88,6 +89,11 @@ public class EndOfAllInputSetter extends
             endOfAllInputFlag = true;
         }
 
+        @Override
+        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+                throws VisitorException {
+            endOfAllInputFlag = true;
+        }
         /**
          * @return if end of all input is present
          */

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Fri Jul  9 17:48:23 2010
@@ -23,20 +23,13 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Stack;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
-import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
-import org.apache.pig.SortInfo;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
-import org.apache.pig.data.NonSpillableDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
@@ -48,7 +41,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
 import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.builtin.IsEmpty;
 import org.apache.pig.impl.builtin.GFCross;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
@@ -666,14 +658,94 @@ public class LogToPhyTranslationVisitor 
     @Override
     public void visit(LOCogroup cg) throws VisitorException {
             
-        if (cg.getGroupType() == LOCogroup.GROUPTYPE.COLLECTED) {
-
+        switch (cg.getGroupType()) {
+        
+        case COLLECTED:
             translateCollectedCogroup(cg);
+            break;
 
-        } else {
-            
+        case REGULAR:
             translateRegularCogroup(cg);
+            break;
+            
+        case MERGE:
+            translateMergeCogroup(cg);
+            break;
+            
+        default:
+            throw new LogicalToPhysicalTranslatorException("Unknown CoGroup Modifier",PigException.BUG);
+        }
+    }
+    
+    private void translateMergeCogroup(LOCogroup loCogrp) throws VisitorException{
+        
+        String scope = loCogrp.getOperatorKey().scope;
+        List<LogicalOperator> inputs = loCogrp.getInputs();
+        
+        // LocalRearrange corresponding to each of input 
+        // LR is needed to extract keys out of the tuples.
+        
+        POLocalRearrange[] innerLRs = new POLocalRearrange[inputs.size()];
+        int count = 0;
+        List<PhysicalOperator> inpPOs = new ArrayList<PhysicalOperator>(inputs.size());
+        
+        for (LogicalOperator op : inputs) {
+            PhysicalOperator physOp = logToPhyMap.get(op);
+            inpPOs.add(physOp);
+            
+            List<LogicalPlan> plans = (List<LogicalPlan>)loCogrp.getGroupByPlans().get(op);
+            POLocalRearrange poInnerLR = new POLocalRearrange(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+            poInnerLR.setAlias(loCogrp.getAlias());
+            // LR will contain list of physical plans, because there could be
+            // multiple keys and each key can be an expression.
+            List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+            currentPlans.push(currentPlan);
+            for (LogicalPlan lp : plans) {
+                currentPlan = new PhysicalPlan();
+                PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+                        .spawnChildWalker(lp);
+                pushWalker(childWalker);
+                mCurrentWalker.walk(this);
+                exprPlans.add(currentPlan);
+                popWalker();
+            }
+            currentPlan = currentPlans.pop();
+            try {
+                poInnerLR.setPlans(exprPlans);
+            } catch (PlanException pe) {
+                int errCode = 2071;
+                String msg = "Problem with setting up local rearrange's plans.";
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+            }
+            innerLRs[count] = poInnerLR;
+            try {
+                poInnerLR.setIndex(count++);
+            } catch (ExecException e1) {
+                int errCode = 2058;
+                String msg = "Unable to set index on newly create POLocalRearrange.";
+                throw new VisitorException(msg, errCode, PigException.BUG, e1);
+            }
+            poInnerLR.setKeyType(plans.size() > 1 ? DataType.TUPLE : 
+                        exprPlans.get(0).getLeaves().get(0).getResultType());
+            poInnerLR.setResultType(DataType.TUPLE);
+        }
+        
+        POMergeCogroup poCogrp = new POMergeCogroup(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)),inpPOs,innerLRs,
+                loCogrp.getRequestedParallelism());
+        poCogrp.setAlias(loCogrp.getAlias());
+        poCogrp.setResultType(DataType.TUPLE);
+        currentPlan.add(poCogrp);
+        for (LogicalOperator op : inputs) {
+            try {
+                currentPlan.connect(logToPhyMap.get(op), poCogrp);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
         }
+        logToPhyMap.put(loCogrp, poCogrp);
     }
     
     private void translateRegularCogroup(LOCogroup cg) throws VisitorException {

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Fri Jul  9 17:48:23 2010
@@ -266,6 +266,10 @@ public class PhyPlanVisitor extends Plan
     public void visitMergeJoin(POMergeJoin join) throws VisitorException {
         //do nothing
     }
+    
+    public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException{
+        
+    }
     /**
      * @param stream
      * @throws VisitorException 

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Fri Jul  9 17:48:23 2010
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -35,7 +34,6 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Fri Jul  9 17:48:23 2010
@@ -122,6 +122,9 @@ public class POLocalRearrange extends Ph
     
     private boolean useSecondaryKey = false;
     
+    // By default, we strip keys from the value.
+    private boolean stripKeyFromValue = true;
+    
     public POLocalRearrange(OperatorKey k) {
         this(k, -1, null);
     }
@@ -412,7 +415,6 @@ public class POLocalRearrange extends Ph
         Object key;
         Object secondaryKey=null;
         
-        
         if (secondaryResLst!=null && secondaryResLst.size()>0)
         {
             key = getKeyFromResult(resLst, mainKeyType);
@@ -420,6 +422,13 @@ public class POLocalRearrange extends Ph
         } else
             key = getKeyFromResult(resLst, keyType);
         
+
+        if(!stripKeyFromValue){
+            lrOutput.set(1, key);
+            lrOutput.set(2, value);
+            return lrOutput;
+        }
+        
         if (mIsDistinct) {
 
             //Put the key and the indexed tuple
@@ -781,4 +790,8 @@ public class POLocalRearrange extends Ph
         
     }
 
+    protected void setStripKeyFromValue(boolean stripKeyFromValue) {
+        this.stripKeyFromValue = stripKeyFromValue;
+    }
+
 }

Added: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=962618&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (added)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Fri Jul  9 17:48:23 2010
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
+
+public class POMergeCogroup extends PhysicalOperator {
+
+    private static final long serialVersionUID = 1L;
+
+    private transient List<LoadFunc> sideLoaders;
+
+    private List<FuncSpec> sidFuncSpecs;
+
+    private List<String> sideFileSpecs;
+
+    // Local Rearranges corresponding to side Loader to extract relevant bits
+    // out of the tuple we get from side loaders.
+    private POLocalRearrange LRs[];
+
+    private transient boolean firstTime;
+
+    private transient Comparable<Object> firstKeyOfNextSplit;
+
+    // This is the count of all the relations involved in Cogroup. The Mapped
+    // relation is also included in the count.
+    private transient int relationCnt;
+
+    private transient TupleFactory mTupleFactory;
+
+    private String indexFileName;
+
+    private FuncSpec idxFuncSpec; 
+
+    private transient DataBag[] outBags;
+
+    private transient Tuple prevTopOfHeap;
+
+    private List<String> loaderSignatures;
+
+    private transient boolean createNewBags;
+
+    // Heap contains tuples with exactly three fields, which is same as output
+    // of LR except for third field which contains full-fledged input tuple,
+    // as oppose to key-stripped tuple generated by LR.
+    private transient PriorityQueue<Tuple> heap;
+
+    private transient boolean lastTime;
+
+    // Need to know in each getNext() whether we generated valid output in last
+    // call or not.
+    private transient boolean workingOnNewKey;
+
+    public POMergeCogroup(OperatorKey k,List<PhysicalOperator> inpPOs, 
+            POLocalRearrange[] lrs, int parallel) {
+
+        super(k,parallel,inpPOs);
+        this.LRs = lrs;
+        for(int i=0; i < lrs.length; i++)
+            LRs[i].setStripKeyFromValue(false);
+    }
+
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+
+        try{
+            if(createNewBags){      
+                // This implies we just generated output tuple in last call.
+                // So, its time to create new bags.
+                for (int i=0; i < relationCnt; i++)
+                    outBags[i] = new InternalCachedBag(relationCnt);
+                createNewBags = false;
+            }
+
+            // Get the tuple from predecessor.
+            Result baseInp = processInput();
+            Tuple rearranged;
+
+            switch (baseInp.returnStatus) {
+
+            case POStatus.STATUS_OK:
+                rearranged = applyLRon((Tuple)baseInp.result, 0);
+                break;
+
+            case POStatus.STATUS_EOP:
+                if(!this.parentPlan.endOfAllInput)
+                    return baseInp;
+
+                if(lastTime)
+                    return baseInp; 
+
+                // We got all the records from mapper but have not yet generated
+                // all the outputs that we need to, so we continue filling 
+                // and draining the heap from side loaders.
+                while(!heap.isEmpty()){
+
+                    Tuple topOfHeap = heap.peek();
+
+                    if(needToBreak(topOfHeap, prevTopOfHeap))
+                        break;
+
+                    workingOnNewKey = false;
+                    topOfHeap = heap.poll();
+                    byte relIdx = (Byte)topOfHeap.get(0);
+                    prevTopOfHeap = topOfHeap;
+                    outBags[relIdx].add((Tuple)topOfHeap.get(2));
+
+                    if(relIdx == 0) // We got all the mapper has to offer.
+                        continue;
+
+                    Tuple nxtTuple = sideLoaders.get(relIdx-1).getNext();
+                    if(nxtTuple == null)
+                        continue;
+
+                    Tuple rearrangedTup = applyLRon(nxtTuple, relIdx);
+                    Object key = rearrangedTup.get(1);
+                    if(null == firstKeyOfNextSplit || null == key || firstKeyOfNextSplit.compareTo(key) > 0){
+                        heap.offer(rearrangedTup);
+                    }
+                }
+
+                // This is the last output we will produce, if heap is empty
+                // or we got ahead of the first key of the next split.
+                if(heap.isEmpty() || (firstKeyOfNextSplit != null && firstKeyOfNextSplit.compareTo(heap.peek().get(1)) <= 0))
+                    lastTime = true;
+
+                return getOutputTuple();
+
+            default: // In case of errors, we return the tuple as it is.
+                return baseInp;
+            }
+
+            // Every time we read something valid from mapper, we add it to heap.
+            heap.offer(rearranged);
+
+            if(firstTime){
+                setup(rearranged);
+                firstTime = false;
+                // Heap is initialized with first record from all the relations.
+            }
+
+            // Algorithm is as following:
+
+            /* while (there are records in heap) {
+                peek at top record from heap
+                if we see a key change from previous run, break to return output.
+                else, 
+                pull tuple from top of heap and place it in bag based on which input it came from
+                pull record from input that last record from top of heap came from
+                if (key pulled < first key in next split) insert into heap
+                }
+                output final record
+             */
+
+            while(!heap.isEmpty()){
+
+                Tuple topOfHeap = heap.peek();
+
+                if(needToBreak(topOfHeap, prevTopOfHeap))
+                    break;
+
+                workingOnNewKey = false;
+                topOfHeap = heap.poll();
+                byte relIdx = (Byte)topOfHeap.get(0);
+                prevTopOfHeap = topOfHeap;
+                outBags[relIdx].add((Tuple)topOfHeap.get(2));
+
+                // Pull tuple from the corresponding loader.
+                if(relIdx == 0){
+                    Tuple tuple = heap.peek();
+                    /* At this point, its possible to return the output tuple.
+                     * So, lets check if we can do so.
+                     * Remember, that for tuples having equal keys, tuple from
+                     * leftmost relation is considered largest. So, if the tuple
+                     * we just peeked from heap is of leftmost relation and has
+                     * a different key then the tuple we polled last,  then we can be 
+                     * assured that we have seen all the tuples belonging to this
+                     * key and thus we can generate output tuple. 
+                     * Else, just pull the next tuple from the left relation by
+                     * returning EOP.
+                     */
+                    // First check if there is a tuple at top of heap and is from 
+                    // left relation.
+                    if( (null != tuple) && (Byte)tuple.get(0) == 0){
+                        // Is key null for both tuples.
+                        if(prevTopOfHeap.get(1) == null && tuple.get(1) == null){
+                            return new Result(POStatus.STATUS_EOP,null); 
+                        }
+                        // Does key change from non-null to null or from one
+                        // non-null to another non-null.
+                        if((prevTopOfHeap.get(1) == null && tuple.get(1) != null) || !tuple.get(1).equals(prevTopOfHeap.get(1))){
+                            return getOutputTuple();
+                        }
+                    }
+                    // Either top of heap is from different relation or it is
+                    // from left relation but having the same key.
+                    return new Result(POStatus.STATUS_EOP,null); 
+                }
+                
+                Tuple nxtTuple = sideLoaders.get(relIdx-1).getNext();
+                if(nxtTuple == null) // EOF for this relation
+                    continue;
+
+                Tuple rearrangedTup = applyLRon(nxtTuple, relIdx);
+                Object key = rearrangedTup.get(1);
+
+                if(firstKeyOfNextSplit == null || null == key || firstKeyOfNextSplit.compareTo(key) > 0){
+                    heap.offer(rearrangedTup);
+                }
+            }
+
+            return getOutputTuple();
+        }
+
+        catch(IOException ioe){
+            throw new ExecException(ioe);
+        }
+    }
+
+    private Result getOutputTuple() throws ExecException{
+
+        workingOnNewKey = true;
+        createNewBags = true;
+
+        Tuple out = mTupleFactory.newTuple(relationCnt+1);
+
+        out.set(0, prevTopOfHeap.get(1));
+        for(int i=0; i < relationCnt; i++)
+            out.set(i+1,(outBags[i]));
+
+        return new Result(POStatus.STATUS_OK, out);        
+    }
+
+
+    private boolean needToBreak(final Tuple curTopOfHeap, final Tuple prevTopOfHeap) throws ExecException{
+
+        if(workingOnNewKey)
+            // This implies we just returned an output tuple in previous call
+            // so we are now working on a fresh key.
+            return false;
+
+        Object curKey = curTopOfHeap.get(1);
+        Object prevKey = prevTopOfHeap.get(1);
+
+        if (curKey == null && null == prevKey)
+            // If both keys are nulls, check if they are coming from same relation.
+            // because nulls from different relations are not considered equal.
+            return ! ((Byte)curTopOfHeap.get(0)).equals((Byte)prevTopOfHeap.get(0));
+
+
+        if(curKey == null || null == prevKey)
+            // If only one of them is null, then key has changed from null to non-null.
+            return true;
+
+        // Both the keys are non-null.
+        return ! curKey.equals(prevKey);
+    }
+
+
+    @SuppressWarnings( "unchecked")
+    private void setup(Tuple firstRearrangedTup) throws IOException{
+
+        // Read our own split Index.
+        int  curSplitIdx = ((PigSplit)((Context)PigMapReduce.sJobContext).getInputSplit()).getSplitIndex();
+        Object firstBaseKey = firstRearrangedTup.get(1);
+        List<Pair<Integer,Tuple>> index = readIndex();
+
+        // If we are in last split, firstKeyOfNextSplit is marked as null.
+        // Null value of firstKeyOfNextSplit is used to indicate collect all 
+        // tuples from both base loaders as well as side loaders 
+        // and process them in this map.
+        // Note that nulls are smaller then anything else. So, if there are 
+        // nulls in data, they will be in very first split. So, null value of 
+        // this variable can be used to determine whether we are working in last
+        // split or not.
+
+        firstKeyOfNextSplit = getFirstKeyOfNextSplit(curSplitIdx, index); 
+
+        // Open all other streams. 
+        // If this is first split, start from very first record. 
+        // For all other splits, bind to the first key which is greater
+        // then or equal to the first key of the map.
+
+        for(int i=0; i < relationCnt-1; i ++){
+
+            LoadFunc loadfunc = (LoadFunc)PigContext.instantiateFuncFromSpec(sidFuncSpecs.get(i));
+            loadfunc.setUDFContextSignature(loaderSignatures.get(i));
+            Job dummyJob = new Job(new Configuration(PigMapReduce.sJobConf));
+            loadfunc.setLocation(sideFileSpecs.get(i), dummyJob);
+            ((IndexableLoadFunc)loadfunc).initialize(dummyJob.getConfiguration());
+            sideLoaders.add(loadfunc);
+            Tuple rearranged;
+
+            if ( index.get(0).first.equals(curSplitIdx)){ 
+                // This is a first split, bind at very first record in all side relations.
+                Tuple t = loadfunc.getNext();
+                if(null == t)   // This side relation is entirely empty.
+                    continue;
+                rearranged = applyLRon(t, i+1);
+                heap.offer(rearranged);
+                continue;
+            }
+            else{
+                // This is not a first split, we need to bind to the key equal 
+                // to the firstBaseKey or next key thereafter.
+
+                // First seek close to base key.  
+                ((IndexableLoadFunc)loadfunc).seekNear(firstBaseKey instanceof 
+                        Tuple ? (Tuple) firstBaseKey : mTupleFactory.newTuple(firstBaseKey));
+
+                // Since contract of IndexableLoadFunc is not clear where we 
+                // will land up after seekNear() call,
+                // we start reading from side loader to get to the point where key 
+                // is actually greater or equal to base key.
+                while(true){
+                    Tuple t = loadfunc.getNext();
+                    if(t==null) // This relation has ended.
+                        break;
+                    rearranged = applyLRon(t, i+1);
+                    if(rearranged.get(1) == null) // If we got a null key here
+                        continue;             // it implies we are still behind.
+
+                    int cmpVal = ((Comparable<Object>)rearranged.get(1)).compareTo(firstBaseKey);
+                    if(cmpVal >= 0){  // Break away as soon as we get ahead.
+
+                        // Add this tuple in heap only if it needs to be processed in
+                        // this map. That is it needs to be smaller then next split's
+                        // first key, unless this is the last split, in which case it
+                        // will be processed in this map.
+                        if(firstKeyOfNextSplit == null || firstKeyOfNextSplit.compareTo(rearranged.get(1)) > 0 ){
+                            heap.offer(rearranged);                
+                        }                        
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    private List<Pair<Integer,Tuple>> readIndex() throws ExecException{
+
+        // Assertions on index we are about to read:
+        // We are reading index from a file through POLoad which will return tuples.
+        // These tuples looks as follows:
+        // (key1, key2, key3,..,WritableComparable(wc),splitIdx(si))
+        // Index is sorted on key1, then on key2, key3.. , wc, si.
+        // Index contains exactly one entry per split.
+        // Since this is loaded through CollectableLoadFunc, keys can't repeat.
+        // Thus, each index entry contains unique key.
+        // Since nulls are smaller then anything else, if its in data, key with null value 
+        // should be very first entry of index.
+
+        // First create a loader to read index.
+        POLoad ld = new POLoad(new OperatorKey(this.mKey.scope,NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)), 
+                new FileSpec(indexFileName, idxFuncSpec));
+
+        // Index file is distributed through Distributed Cache to all mappers. So, read it locally.
+        Properties props = ConfigurationUtil.getLocalFSProperties();
+        ld.setPc(new PigContext(ExecType.LOCAL, props));
+
+        // Each index entry is read as a pair of split index and a tuple consisting of key.
+        List<Pair<Integer,Tuple>> index = new ArrayList<Pair<Integer,Tuple>>();
+
+        for(Result res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple)){
+
+            Tuple  idxTuple = (Tuple)res.result;
+            int colCnt = idxTuple.size()-2;
+            Tuple keyTuple = mTupleFactory.newTuple(colCnt);
+
+            for (int i=0; i< colCnt; i++)
+                keyTuple.set(i, idxTuple.get(i));
+
+            index.add(new Pair<Integer, Tuple>((Integer)idxTuple.get(colCnt+1), keyTuple));
+        }
+
+        return index;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Comparable<Object> getFirstKeyOfNextSplit(final int curSplitIdx, final List<Pair<Integer,Tuple>> index) throws IOException{
+
+        // First find out the index entry corresponding to our current split.
+        int i;
+        for(i=0; i < index.size(); i++){
+            if(index.get(i).first.equals(curSplitIdx))
+                break;
+        }
+
+        // Now read key of the very next index entry.
+        if(i < index.size()-1){
+            Tuple keyTuple =  index.get(i+1).second;
+            return keyTuple.size() == 1 ? (Comparable<Object>)keyTuple.get(0) : keyTuple;
+        }
+
+        // If we are here it implies, current split is the last split.
+        return null;
+
+
+    }
+
+    private Tuple applyLRon(final Tuple inp, final int lrIdx) throws ExecException{
+
+        //Separate Key & Value of input using corresponding LR operator
+        POLocalRearrange lr = LRs[lrIdx];
+        lr.attachInput(inp);
+        Result lrOut = lr.getNext(dummyTuple);
+
+        if(lrOut.returnStatus!=POStatus.STATUS_OK){
+            int errCode = 2167;
+            String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
+            throw new ExecException(errMsg,errCode,PigException.BUG);
+        } 
+
+        return mTupleFactory.newTuple(((Tuple)lrOut.result).getAll());
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitMergeCoGroup(this);
+    }
+
+    @Override
+    public String name() {
+        return "MergeCogroup["
+                + DataType.findTypeName(resultType) + "]" + " - "
+                + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    public List<PhysicalPlan> getLRInnerPlansOf(int i) {
+        return this.LRs[i].getPlans();
+    }
+
+    public void setSideLoadFuncs(List<FuncSpec> sideLoadFuncs) {
+        this.sidFuncSpecs = sideLoadFuncs;
+    }
+
+    public void setSideFileSpecs(List<String> sideFileSpecs) {
+        this.sideFileSpecs = sideFileSpecs;
+    }
+
+    public String getIndexFileName() {
+        return indexFileName;
+    }
+
+    public void setIndexFileName(String indexFileName) {
+        this.indexFileName = indexFileName;
+    }
+
+    public FuncSpec getIdxFuncSpec() {
+        return idxFuncSpec;
+    }
+
+    public void setIdxFuncSpec(FuncSpec idxFileSpec) {
+        this.idxFuncSpec = idxFileSpec;
+    }
+
+    public void setLoaderSignatures(List<String> loaderSignatures) {
+        this.loaderSignatures = loaderSignatures;
+    }
+
+    private void readObject(final ObjectInputStream is) throws IOException,
+    ClassNotFoundException, ExecException {
+
+        is.defaultReadObject();
+        mTupleFactory = TupleFactory.getInstance();
+        this.heap = new PriorityQueue<Tuple>(11, new Comparator<Tuple>() {
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public int compare(final Tuple resLHS, final Tuple resRHS) {
+                try {
+                    // First, about null keys.
+                    // Java's priority queue doesn't handle null, so we handle it ourselves.
+
+                    // If both keys are null, then keys of side loader is considered smaller.
+                    // That is so, because when we poll the heap, we want to get the keys
+                    // of side loader first then the keys of base loader.
+
+                    // If null is compared against non-null, null is smaller. 
+
+                    Object leftKey = resLHS.get(1);
+                    Object rightKey = resRHS.get(1);
+
+                    if (null == leftKey && null == rightKey){
+                        return ((Byte)resRHS.get(0)).compareTo((Byte)resLHS.get(0));
+                    }
+                    if(null == leftKey)
+                        return -1;
+                    if(null == rightKey)
+                        return 1;
+
+                    // Now, about non-null keys.
+                    // Compare the keys which are at index 1 of tuples being
+                    // put in heap.                    
+
+                    int cmpval = ((Comparable<Object>)leftKey).compareTo(rightKey);
+
+                    // If keys are equal, tuple from side relations 
+                    // are considered smaller. This is so because we want 
+                    // to get back tuple of the mapper last when polling tuples
+                    // from heap. And index of mapped relation is 0.
+
+                    return cmpval == 0 ? ((Byte)resRHS.get(0)).compareTo((Byte)resLHS.get(0)) : cmpval;
+                } catch (ExecException e) {
+
+                    // Alas, no choice but to throw Runtime exception.
+                    String errMsg = "Exception occured in compare() of heap in POMergeCogroup.";
+                    throw new RuntimeException(errMsg,e);
+                }
+            } 
+        });
+
+        this.createNewBags = true;
+        this.lastTime = false;
+        this.relationCnt = LRs.length;
+        this.outBags = new DataBag[relationCnt];
+        this.firstTime = true;
+        this.workingOnNewKey = true;
+        this.sideLoaders = new ArrayList<LoadFunc>();
+    }
+
+    // This function is only for debugging. Call it whenever you want to print
+    // the current state of heap.
+    int counter = 0;
+    private void printHeap(){
+        System.out.println("Printing heap :"+ ++counter);
+        PriorityQueue<Tuple> copy = new PriorityQueue<Tuple>(heap);
+        System.out.println("Heap size: "+heap.size());
+        int i =0;
+        while(!copy.isEmpty()){
+            System.out.println(i+++"th item in heap: "+ copy.poll());
+        }
+    }
+}

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultTuple.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultTuple.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultTuple.java Fri Jul  9 17:48:23 2010
@@ -25,11 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.WritableComparable;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Fri Jul  9 17:48:23 2010
@@ -50,7 +50,8 @@ public class LOCogroup extends Relationa
      */
     public static enum GROUPTYPE {
         REGULAR,    // Regular (co)group
-        COLLECTED   // Collected group
+        COLLECTED,   // Collected group
+        MERGE       // Map-side CoGroup on sorted data
     };
 
     /**

Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Jul  9 17:48:23 2010
@@ -268,8 +268,14 @@ public class QueryParser {
             return cogroup;
         }
 
+        else if (modifier.equalsIgnoreCase("merge")){
+            LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.MERGE);
+            cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
+            return cogroup;
+        }
+        
         else{
-            throw new ParseException("Only COLLECTED or REGULAR are valid GROUP modifiers.");
+            throw new ParseException("Only COLLECTED, REGULAR or MERGE are valid GROUP modifiers.");
         }
     }
     
@@ -1809,6 +1815,10 @@ LogicalOperator CogroupClause(LogicalPla
             log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes."); 
             cogroup = parseUsingForGroupBy("regular", gis, lp);
             }
+         |("\"merge\"") {
+            log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes."); 
+            cogroup = parseUsingForGroupBy("merge", gis, lp);
+            }
         )])
     )