You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ha...@apache.org on 2010/03/16 18:09:17 UTC

svn commit: r923872 - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/impl/builtin/ t...

Author: hashutosh
Date: Tue Mar 16 17:09:16 2010
New Revision: 923872

URL: http://svn.apache.org/viewvc?rev=923872&view=rev
Log:
PIG-1292: Interface Refinements (hashutosh)

Added:
    hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java
    hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java
    hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Mar 16 17:09:16 2010
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
 
 INCOMPATIBLE CHANGES
 
+PIG-1292: Interface Refinements (hashutosh)
+
 PIG-1259: ResourceFieldSchema.setSchema should not allow a bag field without a
 Tuple as its only sub field (the tuple itself can have a schema with > 1
 subfields) (pradeepkth)

Added: hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java?rev=923872&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java Tue Mar 16 17:09:16 2010
@@ -0,0 +1,21 @@
+package org.apache.pig;
+
+import java.io.IOException;
+
+/**
+ * This interface implemented by {@link LoadFunc} implementations indicates to 
+ * Pig that it has the capability to load data such that all instances of a key 
+ * will occur in same split.
+ * @since Pig 0.7
+ */
+public interface CollectableLoadFunc {
+
+    /**
+     * When this method is called, Pig is communicating to Loader that it must
+     * load data such that all instances of a key are in same split. Pig will
+     * make no further checks at runtime to ensure whether contract is honored
+     * or not.
+     * @throws IOException
+     */
+    public void ensureAllKeyInstancesInSameSplit() throws IOException;
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java Tue Mar 16 17:09:16 2010
@@ -19,22 +19,22 @@ package org.apache.pig;
 
 import java.io.IOException;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 
 /**
  * This class provides an implementation of OrderedLoadFunc interface
  * which can be optionally re-used by LoadFuncs that use FileInputFormat, by
  * having this as a super class
  */
-public abstract class FileInputLoadFunc extends OrderedLoadFunc {
+public abstract class FileInputLoadFunc extends LoadFunc implements OrderedLoadFunc  {
     
     @Override
-    public WritableComparable<?> getSplitComparable(PigSplit split)
+    public WritableComparable<?> getSplitComparable(InputSplit split)
     throws IOException{
         FileSplit fileSplit = null;
-        if(split.getWrappedSplit() instanceof FileSplit){
-            fileSplit = (FileSplit)split.getWrappedSplit();
+        if(split instanceof FileSplit){
+            fileSplit = (FileSplit)split;
         }else{
             throw new RuntimeException("LoadFunc expected split of type FileSplit");
         }
@@ -46,5 +46,3 @@ public abstract class FileInputLoadFunc 
     }
 
 }
-
-

Modified: hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java Tue Mar 16 17:09:16 2010
@@ -31,15 +31,15 @@ import org.apache.pig.data.Tuple;
  * needs to perform the merge based join.
  * 
  * The sequence of calls made from the pig runtime are:
- * {@link IndexableLoadFunc#setUDFContextSignature(String)}
+ * {@link LoadFunc#setUDFContextSignature(String)}
  * {@link IndexableLoadFunc#initialize(Configuration)}
- * {@link IndexableLoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)}
+ * {@link LoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)}
  * {@link IndexableLoadFunc#seekNear(Tuple)}
  * A series of IndexableLoadFunc.getNext(); calls to perform the join
  * IndexableLoadFunc.close(); 
  * 
  */
-public abstract class IndexableLoadFunc extends LoadFunc {
+public interface IndexableLoadFunc {
     
     /**
      * This method is called by pig run time to allow the

Modified: hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java Tue Mar 16 17:09:16 2010
@@ -21,7 +21,8 @@ package org.apache.pig;
 import java.io.IOException;
 
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+
 
 /**
  * Implementing this interface indicates to Pig that a given loader
@@ -29,8 +30,9 @@ import org.apache.pig.backend.hadoop.exe
  * WritableComparable object is stored in the index created by
  *  MergeJoin sampling MR job to get an ordered sequence of splits.
  * This is necessary when the sort key spans multiple splits.
+ * @since Pig 0.7
  */
-public abstract class OrderedLoadFunc extends LoadFunc {
+public interface OrderedLoadFunc {
 
     /**
      * The WritableComparable object returned will be used to compare
@@ -39,7 +41,7 @@ public abstract class OrderedLoadFunc ex
      * @return WritableComparable representing the position of the split in input
      * @throws IOException
      */
-    public abstract WritableComparable<?> getSplitComparable(PigSplit split) 
+    public WritableComparable<?> getSplitComparable(InputSplit split) 
     throws IOException;
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Mar 16 17:09:16 2010
@@ -31,6 +31,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
@@ -906,18 +907,61 @@ public class MRCompiler extends PhyPlanV
 
     @Override
     public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
-        try{
-            nonBlocking(op);
-            List<PhysicalPlan> plans = op.getPlans();
-            if(plans!=null)
-                for(PhysicalPlan ep : plans)
-                    addUDFs(ep);
-            phyToMROpMap.put(op, curMROp);
-        }catch(Exception e){
-            int errCode = 2034;
-            String msg = "Error compiling operator " + op.getClass().getSimpleName();
-            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        
+        if(!curMROp.mapDone){
+            
+            List<PhysicalOperator> roots = curMROp.mapPlan.getRoots();
+            if(roots.size() != 1){
+                int errCode = 2171;
+                String errMsg = "Expected one but found more then one root physical operator in physical plan.";
+                throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+            }
+            
+            PhysicalOperator phyOp = roots.get(0);
+            if(! (phyOp instanceof POLoad)){
+                int errCode = 2172;
+                String errMsg = "Expected physical operator at root to be POLoad. Found : "+phyOp.getClass().getCanonicalName();
+                throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+            }
+            
+            POLoad loader = (POLoad)phyOp;
+            LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(loader.getLFile().getFuncSpec());
+            try {
+                if(!(loadFunc instanceof CollectableLoadFunc)){
+                    throw new MRCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.");
+                }
+                ((CollectableLoadFunc)loadFunc).ensureAllKeyInstancesInSameSplit();
+            } catch (MRCompilerException e){
+                throw (e);
+            } catch (IOException e) {
+                int errCode = 2034;
+                String msg = "Error compiling operator " + op.getClass().getSimpleName();
+                throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+            }
+
+            try{
+                nonBlocking(op);
+                List<PhysicalPlan> plans = op.getPlans();
+                if(plans!=null)
+                    for(PhysicalPlan ep : plans)
+                        addUDFs(ep);
+                phyToMROpMap.put(op, curMROp);
+            }catch(Exception e){
+                int errCode = 2034;
+                String msg = "Error compiling operator " + op.getClass().getSimpleName();
+                throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+            }    
+        }
+        else if(!curMROp.reduceDone){
+            String msg = "Blocking operators are not allowed before Collected Group. Consider dropping using 'collected'.";
+            throw new MRCompilerException(msg, PigException.BUG);   
+        }
+        else{
+            int errCode = 2022;
+            String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
+            throw new MRCompilerException(msg, errCode, PigException.BUG);   
         }
+        
     }
 
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java Tue Mar 16 17:09:16 2010
@@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.List;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -57,7 +56,7 @@ public class MergeJoinIndexer  extends L
     private PhysicalOperator rightPipelineLeaf;
     private PhysicalOperator rightPipelineRoot;
     private Tuple dummyTuple = null;
-    private OrderedLoadFunc loader;
+    private LoadFunc loader;
     private PigSplit pigSplit = null;
     
     /** @param funcSpec : Loader specification.
@@ -70,7 +69,7 @@ public class MergeJoinIndexer  extends L
     @SuppressWarnings("unchecked")
     public MergeJoinIndexer(String funcSpec, String innerPlan, String serializedPhyPlan) throws ExecException{
         
-        loader = (OrderedLoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
+        loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
         try {
             List<PhysicalPlan> innerPlans = (List<PhysicalPlan>)ObjectSerializer.deserialize(innerPlan);
             lr = new POLocalRearrange(new OperatorKey("MergeJoin Indexer",NodeIdGenerator.getGenerator().getNextNodeId("MergeJoin Indexer")));
@@ -101,7 +100,7 @@ public class MergeJoinIndexer  extends L
         
         if(!firstRec)   // We sample only one record per block.
             return null;
-        WritableComparable<?> position = loader.getSplitComparable(pigSplit);
+        WritableComparable<?> position = ((OrderedLoadFunc)loader).getSplitComparable(pigSplit.getWrappedSplit());
         Object key = null;
         Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);
         

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Tue Mar 16 17:09:16 2010
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -69,7 +70,7 @@ public class POMergeJoin extends Physica
     //The Local Rearrange operators modeling the join key
     private POLocalRearrange[] LRs;
 
-    private transient IndexableLoadFunc rightLoader;
+    private transient LoadFunc rightLoader;
     private OperatorKey opKey;
 
     private Object prevLeftKey;
@@ -245,7 +246,7 @@ public class POMergeJoin extends Physica
                         else{           // This is end of all input and this is last join output.
                             // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
                             try {
-                                rightLoader.close();
+                                ((IndexableLoadFunc)rightLoader).close();
                             } catch (IOException e) {
                                 // Non-fatal error. We can continue.
                                 log.error("Received exception while trying to close right side file: " + e.getMessage());
@@ -377,7 +378,7 @@ public class POMergeJoin extends Physica
                 if(this.parentPlan.endOfAllInput){  // This is end of all input and this is last time we will read right input.
                     // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
                     try {
-                        rightLoader.close();
+                        ((IndexableLoadFunc)rightLoader).close();
                     } catch (IOException e) {
                      // Non-fatal error. We can continue.
                         log.error("Received exception while trying to close right side file: " + e.getMessage());
@@ -389,7 +390,7 @@ public class POMergeJoin extends Physica
     }
     
     private void seekInRightStream(Object firstLeftKey) throws IOException{
-        rightLoader = (IndexableLoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
+        rightLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
         
         // check if hadoop distributed cache is used
         if (indexFile != null && rightLoader instanceof DefaultIndexableLoader) {
@@ -399,12 +400,11 @@ public class POMergeJoin extends Physica
         
         // Pass signature of the loader to rightLoader
         // make a copy of the conf to use in calls to rightLoader.
-        Configuration conf = new Configuration(PigMapReduce.sJobConf);
         rightLoader.setUDFContextSignature(signature);
-        Job job = new Job(conf);
+        Job job = new Job(new Configuration(PigMapReduce.sJobConf));
         rightLoader.setLocation(rightInputFileName, job);
-        rightLoader.initialize(job.getConfiguration());
-        rightLoader.seekNear(
+        ((IndexableLoadFunc)rightLoader).initialize(job.getConfiguration());
+        ((IndexableLoadFunc)rightLoader).seekNear(
                 firstLeftKey instanceof Tuple ? (Tuple)firstLeftKey : mTupleFactory.newTuple(firstLeftKey));
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Tue Mar 16 17:09:16 2010
@@ -57,7 +57,7 @@ import org.apache.pig.impl.util.ObjectSe
  * finds the splitIndex that can contain the key and initializes ReadToEndLoader
  * to read from that splitIndex onwards , in the sequence of splits in the index
  */
-public class DefaultIndexableLoader extends IndexableLoadFunc {
+public class DefaultIndexableLoader extends LoadFunc implements IndexableLoadFunc{
 
     
     // FileSpec of index file which will be read from HDFS.

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Tue Mar 16 17:09:16 2010
@@ -24,9 +24,8 @@ import java.util.List;
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
-import org.apache.pig.EvalFunc;
+import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.ExecType;
-import org.apache.pig.FuncSpec;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
@@ -34,8 +33,11 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.test.utils.LogicalPlanTester;
 import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -79,6 +81,20 @@ public class TestCollectedGroup extends 
         Util.deleteFile(cluster, INPUT_FILE);
     }
     
+    public void testNonCollectableLoader() throws Exception{
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+        LogicalPlan lp = lpt.buildPlan("B = group A by id using 'collected';");
+        PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
+        pc.connect();
+        try {
+            Util.buildMRPlan(Util.buildPhysicalPlan(lp, pc),pc);  
+            fail("Must throw MRCompiler Exception");
+        } catch (Exception e) {
+            assertTrue(e instanceof MRCompilerException);
+        }
+    }
+
     public void testCollectedGrpSpecifiedInSingleQuotes1(){
         
         LogicalPlanTester lpt = new LogicalPlanTester();
@@ -149,8 +165,7 @@ public class TestCollectedGroup extends 
     public void testMapsideGroupByOneColumn() throws IOException{
         pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
 
-        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
-    
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
         try {
             DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
             DataBag dbshj = BagFactory.getInstance().newDefaultBag();
@@ -176,6 +191,7 @@ public class TestCollectedGroup extends 
             Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
 
         } catch (Exception e) {
+            e.printStackTrace();
             fail(e.getMessage());
         }
     }
@@ -183,8 +199,8 @@ public class TestCollectedGroup extends 
     public void testMapsideGroupByMultipleColumns() throws IOException{
         pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
 
-        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
-    
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
+        
         try {
             DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
             DataBag dbshj = BagFactory.getInstance().newDefaultBag();
@@ -210,6 +226,7 @@ public class TestCollectedGroup extends 
             Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
 
         } catch (Exception e) {
+            e.printStackTrace();
             fail(e.getMessage());
         }
     }
@@ -217,8 +234,8 @@ public class TestCollectedGroup extends 
     public void testMapsideGroupByStar() throws IOException{
         pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
 
-        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
-    
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
+        
         try {
             DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
             DataBag dbshj = BagFactory.getInstance().newDefaultBag();
@@ -244,8 +261,16 @@ public class TestCollectedGroup extends 
             Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
 
         } catch (Exception e) {
+            e.printStackTrace();
             fail(e.getMessage());
         }
     }
 
+    public static class DummyCollectableLoader extends PigStorage implements CollectableLoadFunc{
+
+        @Override
+        public void ensureAllKeyInstancesInSameSplit() throws IOException {
+        }
+        
+    }
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Tue Mar 16 17:09:16 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.pig.ExecType;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -593,7 +594,7 @@ public class TestMergeJoin {
      * that expressions are not allowed as merge join keys when the right input's
      * loader implements {@link IndexableLoadFunc}
      */
-    public static class DummyIndexableLoader extends IndexableLoadFunc {
+    public static class DummyIndexableLoader extends LoadFunc implements IndexableLoadFunc{
 
         /**
          *