You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ha...@apache.org on 2010/03/16 18:09:17 UTC
svn commit: r923872 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/impl/builtin/ t...
Author: hashutosh
Date: Tue Mar 16 17:09:16 2010
New Revision: 923872
URL: http://svn.apache.org/viewvc?rev=923872&view=rev
Log:
PIG-1292: Interface Refinements (hashutosh)
Added:
hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java
hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java
hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Mar 16 17:09:16 2010
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
+PIG-1292: Interface Refinements (hashutosh)
+
PIG-1259: ResourceFieldSchema.setSchema should not allow a bag field without a
Tuple as its only sub field (the tuple itself can have a schema with > 1
subfields) (pradeepkth)
Added: hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java?rev=923872&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/CollectableLoadFunc.java Tue Mar 16 17:09:16 2010
@@ -0,0 +1,21 @@
+package org.apache.pig;
+
+import java.io.IOException;
+
+/**
+ * This interface implemented by {@link LoadFunc} implementations indicates to
+ * Pig that it has the capability to load data such that all instances of a key
+ * will occur in same split.
+ * @since Pig 0.7
+ */
+public interface CollectableLoadFunc {
+
+ /**
+ * When this method is called, Pig is communicating to Loader that it must
+ * load data such that all instances of a key are in same split. Pig will
+ * make no further checks at runtime to ensure whether contract is honored
+ * or not.
+ * @throws IOException
+ */
+ public void ensureAllKeyInstancesInSameSplit() throws IOException;
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/FileInputLoadFunc.java Tue Mar 16 17:09:16 2010
@@ -19,22 +19,22 @@ package org.apache.pig;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
/**
* This class provides an implementation of OrderedLoadFunc interface
* which can be optionally re-used by LoadFuncs that use FileInputFormat, by
* having this as a super class
*/
-public abstract class FileInputLoadFunc extends OrderedLoadFunc {
+public abstract class FileInputLoadFunc extends LoadFunc implements OrderedLoadFunc {
@Override
- public WritableComparable<?> getSplitComparable(PigSplit split)
+ public WritableComparable<?> getSplitComparable(InputSplit split)
throws IOException{
FileSplit fileSplit = null;
- if(split.getWrappedSplit() instanceof FileSplit){
- fileSplit = (FileSplit)split.getWrappedSplit();
+ if(split instanceof FileSplit){
+ fileSplit = (FileSplit)split;
}else{
throw new RuntimeException("LoadFunc expected split of type FileSplit");
}
@@ -46,5 +46,3 @@ public abstract class FileInputLoadFunc
}
}
-
-
Modified: hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/IndexableLoadFunc.java Tue Mar 16 17:09:16 2010
@@ -31,15 +31,15 @@ import org.apache.pig.data.Tuple;
* needs to perform the merge based join.
*
* The sequence of calls made from the pig runtime are:
- * {@link IndexableLoadFunc#setUDFContextSignature(String)}
+ * {@link LoadFunc#setUDFContextSignature(String)}
* {@link IndexableLoadFunc#initialize(Configuration)}
- * {@link IndexableLoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)}
+ * {@link LoadFunc#setLocation(String, org.apache.hadoop.mapreduce.Job)}
* {@link IndexableLoadFunc#seekNear(Tuple)}
* A series of IndexableLoadFunc.getNext(); calls to perform the join
* IndexableLoadFunc.close();
*
*/
-public abstract class IndexableLoadFunc extends LoadFunc {
+public interface IndexableLoadFunc {
/**
* This method is called by pig run time to allow the
Modified: hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/OrderedLoadFunc.java Tue Mar 16 17:09:16 2010
@@ -21,7 +21,8 @@ package org.apache.pig;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+
/**
* Implementing this interface indicates to Pig that a given loader
@@ -29,8 +30,9 @@ import org.apache.pig.backend.hadoop.exe
* WritableComparable object is stored in the index created by
* MergeJoin sampling MR job to get an ordered sequence of splits.
* This is necessary when the sort key spans multiple splits.
+ * @since Pig 0.7
*/
-public abstract class OrderedLoadFunc extends LoadFunc {
+public interface OrderedLoadFunc {
/**
* The WritableComparable object returned will be used to compare
@@ -39,7 +41,7 @@ public abstract class OrderedLoadFunc ex
* @return WritableComparable representing the position of the split in input
* @throws IOException
*/
- public abstract WritableComparable<?> getSplitComparable(PigSplit split)
+ public WritableComparable<?> getSplitComparable(InputSplit split)
throws IOException;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Mar 16 17:09:16 2010
@@ -31,6 +31,7 @@ import java.util.Set;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
@@ -906,18 +907,61 @@ public class MRCompiler extends PhyPlanV
@Override
public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
- try{
- nonBlocking(op);
- List<PhysicalPlan> plans = op.getPlans();
- if(plans!=null)
- for(PhysicalPlan ep : plans)
- addUDFs(ep);
- phyToMROpMap.put(op, curMROp);
- }catch(Exception e){
- int errCode = 2034;
- String msg = "Error compiling operator " + op.getClass().getSimpleName();
- throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+
+ if(!curMROp.mapDone){
+
+ List<PhysicalOperator> roots = curMROp.mapPlan.getRoots();
+ if(roots.size() != 1){
+ int errCode = 2171;
+ String errMsg = "Expected one but found more then one root physical operator in physical plan.";
+ throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ PhysicalOperator phyOp = roots.get(0);
+ if(! (phyOp instanceof POLoad)){
+ int errCode = 2172;
+ String errMsg = "Expected physical operator at root to be POLoad. Found : "+phyOp.getClass().getCanonicalName();
+ throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ POLoad loader = (POLoad)phyOp;
+ LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(loader.getLFile().getFuncSpec());
+ try {
+ if(!(loadFunc instanceof CollectableLoadFunc)){
+ throw new MRCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.");
+ }
+ ((CollectableLoadFunc)loadFunc).ensureAllKeyInstancesInSameSplit();
+ } catch (MRCompilerException e){
+ throw (e);
+ } catch (IOException e) {
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+
+ try{
+ nonBlocking(op);
+ List<PhysicalPlan> plans = op.getPlans();
+ if(plans!=null)
+ for(PhysicalPlan ep : plans)
+ addUDFs(ep);
+ phyToMROpMap.put(op, curMROp);
+ }catch(Exception e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + op.getClass().getSimpleName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ else if(!curMROp.reduceDone){
+ String msg = "Blocking operators are not allowed before Collected Group. Consider dropping using 'collected'.";
+ throw new MRCompilerException(msg, PigException.BUG);
+ }
+ else{
+ int errCode = 2022;
+ String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
+ throw new MRCompilerException(msg, errCode, PigException.BUG);
}
+
}
@Override
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java Tue Mar 16 17:09:16 2010
@@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -57,7 +56,7 @@ public class MergeJoinIndexer extends L
private PhysicalOperator rightPipelineLeaf;
private PhysicalOperator rightPipelineRoot;
private Tuple dummyTuple = null;
- private OrderedLoadFunc loader;
+ private LoadFunc loader;
private PigSplit pigSplit = null;
/** @param funcSpec : Loader specification.
@@ -70,7 +69,7 @@ public class MergeJoinIndexer extends L
@SuppressWarnings("unchecked")
public MergeJoinIndexer(String funcSpec, String innerPlan, String serializedPhyPlan) throws ExecException{
- loader = (OrderedLoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
+ loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
try {
List<PhysicalPlan> innerPlans = (List<PhysicalPlan>)ObjectSerializer.deserialize(innerPlan);
lr = new POLocalRearrange(new OperatorKey("MergeJoin Indexer",NodeIdGenerator.getGenerator().getNextNodeId("MergeJoin Indexer")));
@@ -101,7 +100,7 @@ public class MergeJoinIndexer extends L
if(!firstRec) // We sample only one record per block.
return null;
- WritableComparable<?> position = loader.getSplitComparable(pigSplit);
+ WritableComparable<?> position = ((OrderedLoadFunc)loader).getSplitComparable(pigSplit.getWrappedSplit());
Object key = null;
Tuple wrapperTuple = mTupleFactory.newTuple(keysCnt+2);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Tue Mar 16 17:09:16 2010
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -69,7 +70,7 @@ public class POMergeJoin extends Physica
//The Local Rearrange operators modeling the join key
private POLocalRearrange[] LRs;
- private transient IndexableLoadFunc rightLoader;
+ private transient LoadFunc rightLoader;
private OperatorKey opKey;
private Object prevLeftKey;
@@ -245,7 +246,7 @@ public class POMergeJoin extends Physica
else{ // This is end of all input and this is last join output.
// Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
try {
- rightLoader.close();
+ ((IndexableLoadFunc)rightLoader).close();
} catch (IOException e) {
// Non-fatal error. We can continue.
log.error("Received exception while trying to close right side file: " + e.getMessage());
@@ -377,7 +378,7 @@ public class POMergeJoin extends Physica
if(this.parentPlan.endOfAllInput){ // This is end of all input and this is last time we will read right input.
// Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
try {
- rightLoader.close();
+ ((IndexableLoadFunc)rightLoader).close();
} catch (IOException e) {
// Non-fatal error. We can continue.
log.error("Received exception while trying to close right side file: " + e.getMessage());
@@ -389,7 +390,7 @@ public class POMergeJoin extends Physica
}
private void seekInRightStream(Object firstLeftKey) throws IOException{
- rightLoader = (IndexableLoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
+ rightLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
// check if hadoop distributed cache is used
if (indexFile != null && rightLoader instanceof DefaultIndexableLoader) {
@@ -399,12 +400,11 @@ public class POMergeJoin extends Physica
// Pass signature of the loader to rightLoader
// make a copy of the conf to use in calls to rightLoader.
- Configuration conf = new Configuration(PigMapReduce.sJobConf);
rightLoader.setUDFContextSignature(signature);
- Job job = new Job(conf);
+ Job job = new Job(new Configuration(PigMapReduce.sJobConf));
rightLoader.setLocation(rightInputFileName, job);
- rightLoader.initialize(job.getConfiguration());
- rightLoader.seekNear(
+ ((IndexableLoadFunc)rightLoader).initialize(job.getConfiguration());
+ ((IndexableLoadFunc)rightLoader).seekNear(
firstLeftKey instanceof Tuple ? (Tuple)firstLeftKey : mTupleFactory.newTuple(firstLeftKey));
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Tue Mar 16 17:09:16 2010
@@ -57,7 +57,7 @@ import org.apache.pig.impl.util.ObjectSe
* finds the splitIndex that can contain the key and initializes ReadToEndLoader
* to read from that splitIndex onwards , in the sequence of splits in the index
*/
-public class DefaultIndexableLoader extends IndexableLoadFunc {
+public class DefaultIndexableLoader extends LoadFunc implements IndexableLoadFunc{
// FileSpec of index file which will be read from HDFS.
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Tue Mar 16 17:09:16 2010
@@ -24,9 +24,8 @@ import java.util.List;
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.pig.EvalFunc;
+import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.ExecType;
-import org.apache.pig.FuncSpec;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
@@ -34,8 +33,11 @@ import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.utils.LogicalPlanTester;
import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.LOCogroup;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.OperatorKey;
@@ -79,6 +81,20 @@ public class TestCollectedGroup extends
Util.deleteFile(cluster, INPUT_FILE);
}
+ public void testNonCollectableLoader() throws Exception{
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+ LogicalPlan lp = lpt.buildPlan("B = group A by id using 'collected';");
+ PigContext pc = new PigContext(ExecType.MAPREDUCE,cluster.getProperties());
+ pc.connect();
+ try {
+ Util.buildMRPlan(Util.buildPhysicalPlan(lp, pc),pc);
+ fail("Must throw MRCompiler Exception");
+ } catch (Exception e) {
+ assertTrue(e instanceof MRCompilerException);
+ }
+ }
+
public void testCollectedGrpSpecifiedInSingleQuotes1(){
LogicalPlanTester lpt = new LogicalPlanTester();
@@ -149,8 +165,7 @@ public class TestCollectedGroup extends
public void testMapsideGroupByOneColumn() throws IOException{
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
- pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
-
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
try {
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
DataBag dbshj = BagFactory.getInstance().newDefaultBag();
@@ -176,6 +191,7 @@ public class TestCollectedGroup extends
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
} catch (Exception e) {
+ e.printStackTrace();
fail(e.getMessage());
}
}
@@ -183,8 +199,8 @@ public class TestCollectedGroup extends
public void testMapsideGroupByMultipleColumns() throws IOException{
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
- pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
-
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
+
try {
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
DataBag dbshj = BagFactory.getInstance().newDefaultBag();
@@ -210,6 +226,7 @@ public class TestCollectedGroup extends
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
} catch (Exception e) {
+ e.printStackTrace();
fail(e.getMessage());
}
}
@@ -217,8 +234,8 @@ public class TestCollectedGroup extends
public void testMapsideGroupByStar() throws IOException{
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
- pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
-
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
+
try {
DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
DataBag dbshj = BagFactory.getInstance().newDefaultBag();
@@ -244,8 +261,16 @@ public class TestCollectedGroup extends
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
} catch (Exception e) {
+ e.printStackTrace();
fail(e.getMessage());
}
}
+ public static class DummyCollectableLoader extends PigStorage implements CollectableLoadFunc{
+
+ @Override
+ public void ensureAllKeyInstancesInSameSplit() throws IOException {
+ }
+
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=923872&r1=923871&r2=923872&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Tue Mar 16 17:09:16 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.pig.ExecType;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.datastorage.DataStorage;
@@ -593,7 +594,7 @@ public class TestMergeJoin {
* that expressions are not allowed as merge join keys when the right input's
* loader implements {@link IndexableLoadFunc}
*/
- public static class DummyIndexableLoader extends IndexableLoadFunc {
+ public static class DummyIndexableLoader extends LoadFunc implements IndexableLoadFunc{
/**
*