You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ha...@apache.org on 2010/07/09 19:48:24 UTC
svn commit: r962618 [1/2] - in /hadoop/pig/branches/branch-0.7: ./
src/org/apache/pig/backend/hadoop/datastorage/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ ...
Author: hashutosh
Date: Fri Jul 9 17:48:23 2010
New Revision: 962618
URL: http://svn.apache.org/viewvc?rev=962618&view=rev
Log:
PIG-1309: Map-side Cogroup
Added:
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestMapSideCogroup.java
Modified:
hadoop/pig/branches/branch-0.7/CHANGES.txt
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultTuple.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/utils/TestHelper.java
Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/CHANGES.txt Fri Jul 9 17:48:23 2010
@@ -71,6 +71,8 @@ manner (rding via pradeepkth)
IMPROVEMENTS
+PIG-1309: Map-side Cogroup (hashutosh)
+
PIG-1441: new test targets (olgan)
PIG-1381: Need a way for Pig to take an alternative property file (daijy)
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Fri Jul 9 17:48:23 2010
@@ -25,6 +25,7 @@ import java.util.Properties;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
public class ConfigurationUtil {
@@ -62,4 +63,12 @@ public class ConfigurationUtil {
}
}
+
+ public static Properties getLocalFSProperties() {
+ Configuration localConf = new Configuration(false);
+ localConf.addResource("core-default.xml");
+ Properties props = ConfigurationUtil.toProperties(localConf);
+ props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+ return props;
+ }
}
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Jul 9 17:48:23 2010
@@ -59,6 +59,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
@@ -812,7 +813,7 @@ public class JobControlCompiler{
// global sort set (because in that case it's the sampling job) or if
// it's a limit after a sort.
boolean hasOrderBy = false;
- if (mro.isGlobalSort() || mro.isLimitAfterSort()) {
+ if (mro.isGlobalSort() || mro.isLimitAfterSort() || mro.usingTypedComparator()) {
hasOrderBy = true;
} else {
List<MapReduceOper> succs = plan.getSuccessors(mro);
@@ -1106,6 +1107,29 @@ public class JobControlCompiler{
throw new VisitorException(msg, e);
}
}
+
+ @Override
+ public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+ throws VisitorException {
+
+ // XXX Hadoop currently doesn't support distributed cache in local mode.
+ // This line will be removed after the support is added
+ if (pigContext.getExecType() == ExecType.LOCAL) return;
+
+ String indexFile = mergeCoGrp.getIndexFileName();
+
+ if (indexFile == null) throw new VisitorException("No index file");
+
+ try {
+ String symlink = addSingleFileToDistributedCache(pigContext,
+ conf, indexFile, "indexfile_mergecogrp_");
+ mergeCoGrp.setIndexFileName(symlink);
+ } catch (IOException e) {
+ String msg = "Internal error. Distributed cache could not " +
+ "be set up for merge cogrp index file";
+ throw new VisitorException(msg, e);
+ }
+ }
}
}
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Jul 9 17:48:23 2010
@@ -61,6 +61,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
@@ -766,7 +767,7 @@ public class MRCompiler extends PhyPlanV
}
}
- public void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper sortMROp) throws PlanException, VisitorException
+ private void connectMapToReduceLimitedSort(MapReduceOper mro, MapReduceOper sortMROp) throws PlanException, VisitorException
{
POLocalRearrange slr = (POLocalRearrange)sortMROp.mapPlan.getLeaves().get(0);
@@ -795,7 +796,7 @@ public class MRCompiler extends PhyPlanV
mro.reducePlan.addAsLeaf(getPlainForEachOP());
}
- public void simpleConnectMapToReduce(MapReduceOper mro) throws PlanException
+ private void simpleConnectMapToReduce(MapReduceOper mro) throws PlanException
{
PhysicalPlan ep = new PhysicalPlan();
POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
@@ -830,7 +831,7 @@ public class MRCompiler extends PhyPlanV
mro.reducePlan.addAsLeaf(getPlainForEachOP());
}
- public POForEach getPlainForEachOP()
+ private POForEach getPlainForEachOP()
{
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
List<Boolean> flat1 = new ArrayList<Boolean>();
@@ -925,12 +926,12 @@ public class MRCompiler extends PhyPlanV
}
POLoad loader = (POLoad)phyOp;
- LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(loader.getLFile().getFuncSpec());
- try {
- if(!(loadFunc instanceof CollectableLoadFunc)){
- throw new MRCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.");
- }
- loadFunc.setUDFContextSignature(loader.getSignature());
+ Object loadFunc = PigContext.instantiateFuncFromSpec(loader.getLFile().getFuncSpec());
+ try {
+ if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
+ throw new MRCompilerException("While using 'collected' on group; data must be loaded via loader implementing CollectableLoadFunc.");
+ }
+ ((LoadFunc)loadFunc).setUDFContextSignature(loader.getSignature());
((CollectableLoadFunc)loadFunc).ensureAllKeyInstancesInSameSplit();
} catch (MRCompilerException e){
throw (e);
@@ -1082,6 +1083,184 @@ public class MRCompiler extends PhyPlanV
}
}
+ /** Leftmost relation is referred as base relation (this is the one fed into mappers.)
+ * First, close all MROpers except for first one (referred as baseMROPer)
+ * Then, create a MROper which will do indexing job (idxMROper)
+ * Connect idxMROper before the mappedMROper in the MRPlan.
+ */
+
+ @Override
+ public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws VisitorException {
+
+ if(compiledInputs.length < 2){
+ String errMsg = "Merge Cogroup work on two or more relations." +
+ "To use map-side group-by on single relation, use 'collected' qualifier.";
+ throw new MRCompilerException(errMsg);
+ }
+
+ List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>(compiledInputs.length-1);
+ List<String> fileSpecs = new ArrayList<String>(compiledInputs.length-1);
+ List<String> loaderSigns = new ArrayList<String>(compiledInputs.length-1);
+
+ try{
+ // Iterate through all the MROpers, disconnect side MROPers from
+ // MROPerPlan and collect all the information needed in different lists.
+
+ for(int i=0 ; i < compiledInputs.length; i++){
+
+ MapReduceOper mrOper = compiledInputs[i];
+ PhysicalPlan mapPlan = mrOper.mapPlan;
+ if(mapPlan.getRoots().size() != 1){
+ int errCode = 2171;
+ String errMsg = "Expected one but found more then one root physical operator in physical plan.";
+ throw new MRCompilerException(errMsg,errCode,PigException.BUG);
+ }
+
+ PhysicalOperator rootPOOp = mapPlan.getRoots().get(0);
+ if(! (rootPOOp instanceof POLoad)){
+ int errCode = 2172;
+ String errMsg = "Expected physical operator at root to be POLoad. Found : "+rootPOOp.getClass().getCanonicalName();
+ throw new MRCompilerException(errMsg,errCode);
+ }
+
+ POLoad sideLoader = (POLoad)rootPOOp;
+ FileSpec loadFileSpec = sideLoader.getLFile();
+ FuncSpec funcSpec = loadFileSpec.getFuncSpec();
+ Object loadfunc = PigContext.instantiateFuncFromSpec(funcSpec);
+ if(i == 0){
+
+ if(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass())))
+ throw new MRCompilerException("Base loader in Cogroup must implement CollectableLoadFunc.");
+
+ ((LoadFunc)loadfunc).setUDFContextSignature(sideLoader.getSignature());
+ ((CollectableLoadFunc)loadfunc).ensureAllKeyInstancesInSameSplit();
+ continue;
+ }
+ if(!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass())))
+ throw new MRCompilerException("Side loaders in cogroup must implement IndexableLoadFunc.");
+
+ funcSpecs.add(funcSpec);
+ fileSpecs.add(loadFileSpec.getFileName());
+ loaderSigns.add(sideLoader.getSignature());
+ MRPlan.remove(mrOper);
+ }
+
+ poCoGrp.setSideLoadFuncs(funcSpecs);
+ poCoGrp.setSideFileSpecs(fileSpecs);
+ poCoGrp.setLoaderSignatures(loaderSigns);
+
+ // Use map-reduce operator of base relation for the cogroup operation.
+ MapReduceOper baseMROp = phyToMROpMap.get(poCoGrp.getInputs().get(0));
+ if(baseMROp.mapDone || !baseMROp.reducePlan.isEmpty())
+ throw new MRCompilerException("Currently merged cogroup is not supported after blocking operators.");
+
+ // Create new map-reduce operator for indexing job and then configure it.
+ MapReduceOper indexerMROp = getMROp();
+ FileSpec idxFileSpec = getIndexingJob(indexerMROp, baseMROp, poCoGrp.getLRInnerPlansOf(0));
+ poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
+ poCoGrp.setIndexFileName(idxFileSpec.getFileName());
+
+ baseMROp.mapPlan.addAsLeaf(poCoGrp);
+ MRPlan.add(indexerMROp);
+ MRPlan.connect(indexerMROp, baseMROp);
+
+ phyToMROpMap.put(poCoGrp,baseMROp);
+ // Going forward, new operators should be added in baseMRop. To make
+ // sure, reset curMROp.
+ curMROp = baseMROp;
+ }
+ catch (ExecException e){
+ throw new MRCompilerException(e.getDetailedMessage(),e.getErrorCode(),e.getErrorSource(),e);
+ }
+ catch (MRCompilerException mrce){
+ throw(mrce);
+ }
+ catch (CloneNotSupportedException e) {
+ throw new MRCompilerException(e);
+ }
+ catch(PlanException e){
+ int errCode = 2034;
+ String msg = "Error compiling operator " + poCoGrp.getClass().getCanonicalName();
+ throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+ }
+ catch (IOException e){
+ int errCode = 3000;
+ String errMsg = "IOException caught while compiling POMergeCoGroup";
+ throw new MRCompilerException(errMsg, errCode,e);
+ }
+ }
+
+ // Sets up the indexing job for map-side cogroups.
+ private FileSpec getIndexingJob(MapReduceOper indexerMROp,
+ final MapReduceOper baseMROp, final List<PhysicalPlan> mapperLRInnerPlans)
+ throws MRCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException {
+
+ // First replace loader with MergeJoinIndexer.
+ PhysicalPlan baseMapPlan = baseMROp.mapPlan;
+ POLoad baseLoader = (POLoad)baseMapPlan.getRoots().get(0);
+ FileSpec origLoaderFileSpec = baseLoader.getLFile();
+ FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
+ Object loadFunc = PigContext.instantiateFuncFromSpec(funcSpec);
+
+ if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
+ int errCode = 1104;
+ String errMsg = "Base relation of merge-coGroup must implement " +
+ "OrderedLoadFunc interface. The specified loader "
+ + funcSpec + " doesn't implement it";
+ throw new MRCompilerException(errMsg,errCode);
+ }
+
+ String[] indexerArgs = new String[6];
+ indexerArgs[0] = funcSpec.toString();
+ indexerArgs[1] = ObjectSerializer.serialize((Serializable)mapperLRInnerPlans);
+ indexerArgs[3] = baseLoader.getSignature();
+ indexerArgs[4] = baseLoader.getOperatorKey().scope;
+ indexerArgs[5] = Boolean.toString(false); // we care for nulls.
+
+ PhysicalPlan phyPlan;
+ if (baseMapPlan.getSuccessors(baseLoader) == null
+ || baseMapPlan.getSuccessors(baseLoader).isEmpty()){
+ // Load-Load-Cogroup case.
+ phyPlan = null;
+ }
+
+ else{ // We got something. Yank it and set it as inner plan.
+ phyPlan = baseMapPlan.clone();
+ PhysicalOperator root = phyPlan.getRoots().get(0);
+ phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0));
+ phyPlan.remove(root);
+
+ }
+ indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
+
+ POLoad idxJobLoader = getLoad();
+ idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
+ new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
+ indexerMROp.mapPlan.add(idxJobLoader);
+
+ // Loader of mro will return a tuple of form -
+ // (key1, key2, .. , WritableComparable, splitIndex). See MergeJoinIndexer for details.
+
+ // After getting an index entry in each mapper, send all of them to one
+ // reducer where they will be sorted on the way by Hadoop.
+ simpleConnectMapToReduce(indexerMROp);
+
+ indexerMROp.requestedParallelism = 1; // we need exactly one reducer for indexing job.
+
+ // We want to use typed tuple comparator for this job, instead of default
+ // raw binary comparator used by Pig, to make sure index entries are
+ // sorted correctly by Hadoop.
+ indexerMROp.useTypedComparator(true);
+
+ POStore st = getStore();
+ FileSpec strFile = getTempFileSpec();
+ st.setSFile(strFile);
+ indexerMROp.reducePlan.addAsLeaf(st);
+ indexerMROp.setReduceDone(true);
+
+ return strFile;
+ }
+
/** Since merge-join works on two inputs there are exactly two MROper predecessors identified as left and right.
* Instead of merging two operators, both are used to generate a MR job each. First MR oper is run to generate on-the-fly index on right side.
* Second is used to actually do the join. First MR oper is identified as rightMROper and second as curMROper.
@@ -1194,7 +1373,7 @@ public class MRCompiler extends PhyPlanV
}
joinOp.setupRightPipeline(rightPipelinePlan);
- rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.
+ rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.
// At this point, we must be operating on map plan of right input and it would contain nothing else other then a POLoad.
POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
@@ -1236,8 +1415,9 @@ public class MRCompiler extends PhyPlanV
}
}
} else {
+
// Replace POLoad with indexer.
- String[] indexerArgs = new String[3];
+ String[] indexerArgs = new String[6];
FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
if (! (PigContext.instantiateFuncFromSpec(indexerArgs[0]) instanceof OrderedLoadFunc)){
@@ -1250,73 +1430,19 @@ public class MRCompiler extends PhyPlanV
List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
+ indexerArgs[3] = rightLoader.getSignature();
+ indexerArgs[4] = rightLoader.getOperatorKey().scope;
+ indexerArgs[5] = Boolean.toString(true);
+
FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
rightLoader.setLFile(lFile);
// Loader of mro will return a tuple of form -
// (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
- // Now set up a POLocalRearrange which has "all" as the key and tuple fetched
- // by loader as the "value" of POLocalRearrange
- // Sorting of index can possibly be achieved by using Hadoop sorting
- // between map and reduce instead of Pig doing sort. If that is so,
- // it will simplify lot of the code below.
-
- PhysicalPlan lrPP = new PhysicalPlan();
- ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
- ce.setValue("all");
- ce.setResultType(DataType.CHARARRAY);
- lrPP.add(ce);
-
- List<PhysicalPlan> lrInnerPlans = new ArrayList<PhysicalPlan>();
- lrInnerPlans.add(lrPP);
-
- POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
- lr.setIndex(0);
- lr.setKeyType(DataType.CHARARRAY);
- lr.setPlans(lrInnerPlans);
- lr.setResultType(DataType.TUPLE);
- rightMROpr.mapPlan.addAsLeaf(lr);
-
- rightMROpr.setMapDone(true);
-
- // On the reduce side of this indexing job, there will be a global rearrange followed by POSort.
- // Output of POSort will be index file dumped on the DFS.
-
- // First add POPackage.
- POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType(DataType.CHARARRAY);
- pkg.setNumInps(1);
- pkg.setInner(new boolean[]{false});
- rightMROpr.reducePlan.add(pkg);
-
- // Next project tuples from the bag created by POPackage.
- POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- topPrj.setColumn(1);
- topPrj.setResultType(DataType.TUPLE);
- topPrj.setOverloaded(true);
- rightMROpr.reducePlan.add(topPrj);
- rightMROpr.reducePlan.connect(pkg, topPrj);
-
- // Now create and add POSort. Sort plan is project *.
- List<PhysicalPlan> sortPlans = new ArrayList<PhysicalPlan>(1);
- PhysicalPlan innerSortPlan = new PhysicalPlan();
- POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj.setStar(true);
- prj.setOverloaded(false);
- prj.setResultType(DataType.TUPLE);
- innerSortPlan.add(prj);
- sortPlans.add(innerSortPlan);
-
- // Currently we assume all columns are in asc order.
- // Add two because filename and offset are added by Indexer in addition to keys.
- List<Boolean> mAscCols = new ArrayList<Boolean>(rightInpPlans.size()+2);
- for(int i=0; i< rightInpPlans.size()+2; i++)
- mAscCols.add(true);
-
- POSort sortOp = new POSort(new OperatorKey(scope,nig.getNextNodeId(scope)),1, null, sortPlans, mAscCols, null);
- rightMROpr.reducePlan.add(sortOp);
- rightMROpr.reducePlan.connect(topPrj, sortOp);
-
+
+ simpleConnectMapToReduce(rightMROpr);
+ rightMROpr.useTypedComparator(true);
+
POStore st = getStore();
FileSpec strFile = getTempFileSpec();
st.setSFile(strFile);
@@ -1334,12 +1460,8 @@ public class MRCompiler extends PhyPlanV
joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
joinOp.setIndexFile(strFile.getFileName());
-
}
-
-// joinOp.setIndexFile(strFile);
-
// We are done with right side. Lets work on left now.
// Join will be materialized in leftMROper.
if(!curMROp.mapDone) // Life is easy
@@ -1657,7 +1779,7 @@ public class MRCompiler extends PhyPlanV
throw new PlanException(msg, errCode, PigException.BUG);
}
- public MapReduceOper getSortJob(
+ private MapReduceOper getSortJob(
POSort sort,
MapReduceOper quantJob,
FileSpec lFile,
@@ -1818,7 +1940,7 @@ public class MRCompiler extends PhyPlanV
return mro;
}
- public Pair<MapReduceOper,Integer> getQuantileJob(
+ private Pair<MapReduceOper,Integer> getQuantileJob(
POSort inpSort,
MapReduceOper prevJob,
FileSpec lFile,
@@ -1854,7 +1976,7 @@ public class MRCompiler extends PhyPlanV
/**
* Create Sampling job for skewed join.
*/
- public Pair<MapReduceOper, Integer> getSkewedJoinSampleJob(POSkewedJoin op, MapReduceOper prevJob,
+ private Pair<MapReduceOper, Integer> getSkewedJoinSampleJob(POSkewedJoin op, MapReduceOper prevJob,
FileSpec lFile, FileSpec sampleFile, int rp ) throws PlanException, VisitorException {
MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
@@ -1932,7 +2054,7 @@ public class MRCompiler extends PhyPlanV
* @throws VisitorException
*/
@SuppressWarnings("deprecation")
- protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
+ private Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans,
String udfClassName, String[] udfArgs, String sampleLdrClassName ) throws PlanException, VisitorException {
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Jul 9 17:48:23 2010
@@ -18,18 +18,12 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
import java.util.Set;
-import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.Operator;
@@ -41,7 +35,7 @@ import org.apache.pig.impl.plan.VisitorE
* Acts as a host to the plans that will
* execute in map, reduce and optionally combine
* phases. These will be embedded in the MROperPlan
- * in order to capture the dependecies amongst jobs.
+ * in order to capture the dependencies amongst jobs.
*/
public class MapReduceOper extends Operator<MROpPlanVisitor> {
private static final long serialVersionUID = 1L;
@@ -133,6 +127,12 @@ public class MapReduceOper extends Opera
// Used by Skewed Join
private String skewedJoinPartitionFile;
+ // Flag to communicate from MRCompiler to JobControlCompiler what kind of
+ // comparator is used by Hadoop for sorting for this MROper.
+ // By default, set to false which will make Pig provide raw comparators.
+ // Set to true in indexing job generated in map-side cogroup, merge join.
+ private boolean usingTypedComparator = false;
+
public MapReduceOper(OperatorKey k) {
super(k);
mapPlan = new PhysicalPlan();
@@ -383,4 +383,12 @@ public class MapReduceOper extends Opera
public void setUseSecondaryKey(boolean useSecondaryKey) {
this.useSecondaryKey = useSecondaryKey;
}
+
+ protected boolean usingTypedComparator() {
+ return usingTypedComparator;
+ }
+
+ protected void useTypedComparator(boolean useTypedComparator) {
+ this.usingTypedComparator = useTypedComparator;
+ }
}
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MergeJoinIndexer.java Fri Jul 9 17:48:23 2010
@@ -58,6 +58,7 @@ public class MergeJoinIndexer extends L
private Tuple dummyTuple = null;
private LoadFunc loader;
private PigSplit pigSplit = null;
+ private boolean ignoreNullKeys;
/** @param funcSpec : Loader specification.
* @param innerPlan : This is serialized version of LR plan. We
@@ -67,12 +68,16 @@ public class MergeJoinIndexer extends L
* @throws ExecException
*/
@SuppressWarnings("unchecked")
- public MergeJoinIndexer(String funcSpec, String innerPlan, String serializedPhyPlan) throws ExecException{
+ public MergeJoinIndexer(String funcSpec, String innerPlan, String serializedPhyPlan,
+ String udfCntxtSignature, String scope, String ignoreNulls) throws ExecException{
loader = (LoadFunc)PigContext.instantiateFuncFromSpec(funcSpec);
+ loader.setUDFContextSignature(udfCntxtSignature);
+ this.ignoreNullKeys = Boolean.parseBoolean(ignoreNulls);
+
try {
List<PhysicalPlan> innerPlans = (List<PhysicalPlan>)ObjectSerializer.deserialize(innerPlan);
- lr = new POLocalRearrange(new OperatorKey("MergeJoin Indexer",NodeIdGenerator.getGenerator().getNextNodeId("MergeJoin Indexer")));
+ lr = new POLocalRearrange(new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope)));
lr.setPlans(innerPlans);
keysCnt = innerPlans.size();
precedingPhyPlan = (PhysicalPlan)ObjectSerializer.deserialize(serializedPhyPlan);
@@ -121,7 +126,7 @@ public class MergeJoinIndexer extends L
lr.attachInput(readTuple);
key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
lr.detachInput();
- if ( null == key) // Tuple with null key. Drop it.
+ if ( null == key && ignoreNullKeys) // Tuple with null key. Drop it.
continue;
break;
}
@@ -141,7 +146,7 @@ public class MergeJoinIndexer extends L
lr.attachInput((Tuple)res.result);
key = ((Tuple)lr.getNext(dummyTuple).result).get(1);
lr.detachInput();
- if ( null == key) // Tuple with null key. Drop it.
+ if ( null == key && ignoreNullKeys) // Tuple with null key. Drop it.
continue;
fetchNewTup = false;
break;
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Fri Jul 9 17:48:23 2010
@@ -296,4 +296,10 @@ public class PhyPlanSetter extends PhyPl
preCombinerLocalRearrange.setParentPlan(parent);
}
+
+ @Override
+ public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+ throws VisitorException {
+ mergeCoGrp.setParentPlan(parent);
+ }
}
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Fri Jul 9 17:48:23 2010
@@ -185,7 +185,7 @@ public class PigSplit extends InputSplit
// package level access because we don't want LoadFunc implementations
// to get this information - this is to be used only from
// MergeJoinIndexer
- int getSplitIndex() {
+ public int getSplitIndex() {
return splitIndex;
}
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Fri Jul 9 17:48:23 2010
@@ -21,6 +21,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -88,6 +89,11 @@ public class EndOfAllInputSetter extends
endOfAllInputFlag = true;
}
+ @Override
+ public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
+ throws VisitorException {
+ endOfAllInputFlag = true;
+ }
/**
* @return if end of all input is present
*/
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Fri Jul 9 17:48:23 2010
@@ -23,20 +23,13 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Stack;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
-import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
-import org.apache.pig.SortInfo;
-import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
-import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
@@ -48,7 +41,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.builtin.IsEmpty;
import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
@@ -666,14 +658,94 @@ public class LogToPhyTranslationVisitor
@Override
public void visit(LOCogroup cg) throws VisitorException {
- if (cg.getGroupType() == LOCogroup.GROUPTYPE.COLLECTED) {
-
+ switch (cg.getGroupType()) {
+
+ case COLLECTED:
translateCollectedCogroup(cg);
+ break;
- } else {
-
+ case REGULAR:
translateRegularCogroup(cg);
+ break;
+
+ case MERGE:
+ translateMergeCogroup(cg);
+ break;
+
+ default:
+ throw new LogicalToPhysicalTranslatorException("Unknown CoGroup Modifier",PigException.BUG);
+ }
+ }
+
+ private void translateMergeCogroup(LOCogroup loCogrp) throws VisitorException{
+
+ String scope = loCogrp.getOperatorKey().scope;
+ List<LogicalOperator> inputs = loCogrp.getInputs();
+
+ // LocalRearrange corresponding to each of input
+ // LR is needed to extract keys out of the tuples.
+
+ POLocalRearrange[] innerLRs = new POLocalRearrange[inputs.size()];
+ int count = 0;
+ List<PhysicalOperator> inpPOs = new ArrayList<PhysicalOperator>(inputs.size());
+
+ for (LogicalOperator op : inputs) {
+ PhysicalOperator physOp = logToPhyMap.get(op);
+ inpPOs.add(physOp);
+
+ List<LogicalPlan> plans = (List<LogicalPlan>)loCogrp.getGroupByPlans().get(op);
+ POLocalRearrange poInnerLR = new POLocalRearrange(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+ poInnerLR.setAlias(loCogrp.getAlias());
+ // LR will contain list of physical plans, because there could be
+ // multiple keys and each key can be an expression.
+ List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+ currentPlans.push(currentPlan);
+ for (LogicalPlan lp : plans) {
+ currentPlan = new PhysicalPlan();
+ PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+ .spawnChildWalker(lp);
+ pushWalker(childWalker);
+ mCurrentWalker.walk(this);
+ exprPlans.add(currentPlan);
+ popWalker();
+ }
+ currentPlan = currentPlans.pop();
+ try {
+ poInnerLR.setPlans(exprPlans);
+ } catch (PlanException pe) {
+ int errCode = 2071;
+ String msg = "Problem with setting up local rearrange's plans.";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+ }
+ innerLRs[count] = poInnerLR;
+ try {
+ poInnerLR.setIndex(count++);
+ } catch (ExecException e1) {
+ int errCode = 2058;
+ String msg = "Unable to set index on newly create POLocalRearrange.";
+ throw new VisitorException(msg, errCode, PigException.BUG, e1);
+ }
+ poInnerLR.setKeyType(plans.size() > 1 ? DataType.TUPLE :
+ exprPlans.get(0).getLeaves().get(0).getResultType());
+ poInnerLR.setResultType(DataType.TUPLE);
+ }
+
+ POMergeCogroup poCogrp = new POMergeCogroup(new OperatorKey(
+ scope, nodeGen.getNextNodeId(scope)),inpPOs,innerLRs,
+ loCogrp.getRequestedParallelism());
+ poCogrp.setAlias(loCogrp.getAlias());
+ poCogrp.setResultType(DataType.TUPLE);
+ currentPlan.add(poCogrp);
+ for (LogicalOperator op : inputs) {
+ try {
+ currentPlan.connect(logToPhyMap.get(op), poCogrp);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
}
+ logToPhyMap.put(loCogrp, poCogrp);
}
private void translateRegularCogroup(LOCogroup cg) throws VisitorException {
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Fri Jul 9 17:48:23 2010
@@ -266,6 +266,10 @@ public class PhyPlanVisitor extends Plan
public void visitMergeJoin(POMergeJoin join) throws VisitorException {
//do nothing
}
+
+ public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException{
+
+ }
/**
* @param stream
* @throws VisitorException
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Fri Jul 9 17:48:23 2010
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -35,7 +34,6 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Fri Jul 9 17:48:23 2010
@@ -122,6 +122,9 @@ public class POLocalRearrange extends Ph
private boolean useSecondaryKey = false;
+ // By default, we strip keys from the value.
+ private boolean stripKeyFromValue = true;
+
public POLocalRearrange(OperatorKey k) {
this(k, -1, null);
}
@@ -412,7 +415,6 @@ public class POLocalRearrange extends Ph
Object key;
Object secondaryKey=null;
-
if (secondaryResLst!=null && secondaryResLst.size()>0)
{
key = getKeyFromResult(resLst, mainKeyType);
@@ -420,6 +422,13 @@ public class POLocalRearrange extends Ph
} else
key = getKeyFromResult(resLst, keyType);
+
+ if(!stripKeyFromValue){
+ lrOutput.set(1, key);
+ lrOutput.set(2, value);
+ return lrOutput;
+ }
+
if (mIsDistinct) {
//Put the key and the indexed tuple
@@ -781,4 +790,8 @@ public class POLocalRearrange extends Ph
}
+ protected void setStripKeyFromValue(boolean stripKeyFromValue) {
+ this.stripKeyFromValue = stripKeyFromValue;
+ }
+
}
Added: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=962618&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (added)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Fri Jul 9 17:48:23 2010
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
+
+public class POMergeCogroup extends PhysicalOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient List<LoadFunc> sideLoaders;
+
+ private List<FuncSpec> sidFuncSpecs;
+
+ private List<String> sideFileSpecs;
+
+ // Local Rearranges corresponding to side Loader to extract relevant bits
+ // out of the tuple we get from side loaders.
+ private POLocalRearrange LRs[];
+
+ private transient boolean firstTime;
+
+ private transient Comparable<Object> firstKeyOfNextSplit;
+
+ // This is the count of all the relations involved in Cogroup. The Mapped
+ // relation is also included in the count.
+ private transient int relationCnt;
+
+ private transient TupleFactory mTupleFactory;
+
+ private String indexFileName;
+
+ private FuncSpec idxFuncSpec;
+
+ private transient DataBag[] outBags;
+
+ private transient Tuple prevTopOfHeap;
+
+ private List<String> loaderSignatures;
+
+ private transient boolean createNewBags;
+
+ // Heap contains tuples with exactly three fields, which is same as output
+ // of LR except for third field which contains full-fledged input tuple,
+ // as oppose to key-stripped tuple generated by LR.
+ private transient PriorityQueue<Tuple> heap;
+
+ private transient boolean lastTime;
+
+ // Need to know in each getNext() whether we generated valid output in last
+ // call or not.
+ private transient boolean workingOnNewKey;
+
+ public POMergeCogroup(OperatorKey k,List<PhysicalOperator> inpPOs,
+ POLocalRearrange[] lrs, int parallel) {
+
+ super(k,parallel,inpPOs);
+ this.LRs = lrs;
+ for(int i=0; i < lrs.length; i++)
+ LRs[i].setStripKeyFromValue(false);
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+
+ try{
+ if(createNewBags){
+ // This implies we just generated output tuple in last call.
+ // So, its time to create new bags.
+ for (int i=0; i < relationCnt; i++)
+ outBags[i] = new InternalCachedBag(relationCnt);
+ createNewBags = false;
+ }
+
+ // Get the tuple from predecessor.
+ Result baseInp = processInput();
+ Tuple rearranged;
+
+ switch (baseInp.returnStatus) {
+
+ case POStatus.STATUS_OK:
+ rearranged = applyLRon((Tuple)baseInp.result, 0);
+ break;
+
+ case POStatus.STATUS_EOP:
+ if(!this.parentPlan.endOfAllInput)
+ return baseInp;
+
+ if(lastTime)
+ return baseInp;
+
+ // We got all the records from mapper but have not yet generated
+ // all the outputs that we need to, so we continue filling
+ // and draining the heap from side loaders.
+ while(!heap.isEmpty()){
+
+ Tuple topOfHeap = heap.peek();
+
+ if(needToBreak(topOfHeap, prevTopOfHeap))
+ break;
+
+ workingOnNewKey = false;
+ topOfHeap = heap.poll();
+ byte relIdx = (Byte)topOfHeap.get(0);
+ prevTopOfHeap = topOfHeap;
+ outBags[relIdx].add((Tuple)topOfHeap.get(2));
+
+ if(relIdx == 0) // We got all the mapper has to offer.
+ continue;
+
+ Tuple nxtTuple = sideLoaders.get(relIdx-1).getNext();
+ if(nxtTuple == null)
+ continue;
+
+ Tuple rearrangedTup = applyLRon(nxtTuple, relIdx);
+ Object key = rearrangedTup.get(1);
+ if(null == firstKeyOfNextSplit || null == key || firstKeyOfNextSplit.compareTo(key) > 0){
+ heap.offer(rearrangedTup);
+ }
+ }
+
+ // This is the last output we will produce, if heap is empty
+ // or we got ahead of the first key of the next split.
+ if(heap.isEmpty() || (firstKeyOfNextSplit != null && firstKeyOfNextSplit.compareTo(heap.peek().get(1)) <= 0))
+ lastTime = true;
+
+ return getOutputTuple();
+
+ default: // In case of errors, we return the tuple as it is.
+ return baseInp;
+ }
+
+ // Every time we read something valid from mapper, we add it to heap.
+ heap.offer(rearranged);
+
+ if(firstTime){
+ setup(rearranged);
+ firstTime = false;
+ // Heap is initialized with first record from all the relations.
+ }
+
+ // Algorithm is as following:
+
+ /* while (there are records in heap) {
+ peek at top record from heap
+ if we see a key change from previous run, break to return output.
+ else,
+ pull tuple from top of heap and place it in bag based on which input it came from
+ pull record from input that last record from top of heap came from
+ if (key pulled < first key in next split) insert into heap
+ }
+ output final record
+ */
+
+ while(!heap.isEmpty()){
+
+ Tuple topOfHeap = heap.peek();
+
+ if(needToBreak(topOfHeap, prevTopOfHeap))
+ break;
+
+ workingOnNewKey = false;
+ topOfHeap = heap.poll();
+ byte relIdx = (Byte)topOfHeap.get(0);
+ prevTopOfHeap = topOfHeap;
+ outBags[relIdx].add((Tuple)topOfHeap.get(2));
+
+ // Pull tuple from the corresponding loader.
+ if(relIdx == 0){
+ Tuple tuple = heap.peek();
+ /* At this point, its possible to return the output tuple.
+ * So, lets check if we can do so.
+ * Remember, that for tuples having equal keys, tuple from
+ * leftmost relation is considered largest. So, if the tuple
+ * we just peeked from heap is of leftmost relation and has
+ * a different key then the tuple we polled last, then we can be
+ * assured that we have seen all the tuples belonging to this
+ * key and thus we can generate output tuple.
+ * Else, just pull the next tuple from the left relation by
+ * returning EOP.
+ */
+ // First check if there is a tuple at top of heap and is from
+ // left relation.
+ if( (null != tuple) && (Byte)tuple.get(0) == 0){
+ // Is key null for both tuples.
+ if(prevTopOfHeap.get(1) == null && tuple.get(1) == null){
+ return new Result(POStatus.STATUS_EOP,null);
+ }
+ // Does key change from non-null to null or from one
+ // non-null to another non-null.
+ if((prevTopOfHeap.get(1) == null && tuple.get(1) != null) || !tuple.get(1).equals(prevTopOfHeap.get(1))){
+ return getOutputTuple();
+ }
+ }
+ // Either top of heap is from different relation or it is
+ // from left relation but having the same key.
+ return new Result(POStatus.STATUS_EOP,null);
+ }
+
+ Tuple nxtTuple = sideLoaders.get(relIdx-1).getNext();
+ if(nxtTuple == null) // EOF for this relation
+ continue;
+
+ Tuple rearrangedTup = applyLRon(nxtTuple, relIdx);
+ Object key = rearrangedTup.get(1);
+
+ if(firstKeyOfNextSplit == null || null == key || firstKeyOfNextSplit.compareTo(key) > 0){
+ heap.offer(rearrangedTup);
+ }
+ }
+
+ return getOutputTuple();
+ }
+
+ catch(IOException ioe){
+ throw new ExecException(ioe);
+ }
+ }
+
+ private Result getOutputTuple() throws ExecException{
+
+ workingOnNewKey = true;
+ createNewBags = true;
+
+ Tuple out = mTupleFactory.newTuple(relationCnt+1);
+
+ out.set(0, prevTopOfHeap.get(1));
+ for(int i=0; i < relationCnt; i++)
+ out.set(i+1,(outBags[i]));
+
+ return new Result(POStatus.STATUS_OK, out);
+ }
+
+
+ private boolean needToBreak(final Tuple curTopOfHeap, final Tuple prevTopOfHeap) throws ExecException{
+
+ if(workingOnNewKey)
+ // This implies we just returned an output tuple in previous call
+ // so we are now working on a fresh key.
+ return false;
+
+ Object curKey = curTopOfHeap.get(1);
+ Object prevKey = prevTopOfHeap.get(1);
+
+ if (curKey == null && null == prevKey)
+ // If both keys are nulls, check if they are coming from same relation.
+ // because nulls from different relations are not considered equal.
+ return ! ((Byte)curTopOfHeap.get(0)).equals((Byte)prevTopOfHeap.get(0));
+
+
+ if(curKey == null || null == prevKey)
+ // If only one of them is null, then key has changed from null to non-null.
+ return true;
+
+ // Both the keys are non-null.
+ return ! curKey.equals(prevKey);
+ }
+
+
+ @SuppressWarnings( "unchecked")
+ private void setup(Tuple firstRearrangedTup) throws IOException{
+
+ // Read our own split Index.
+ int curSplitIdx = ((PigSplit)((Context)PigMapReduce.sJobContext).getInputSplit()).getSplitIndex();
+ Object firstBaseKey = firstRearrangedTup.get(1);
+ List<Pair<Integer,Tuple>> index = readIndex();
+
+ // If we are in last split, firstKeyOfNextSplit is marked as null.
+ // Null value of firstKeyOfNextSplit is used to indicate collect all
+ // tuples from both base loaders as well as side loaders
+ // and process them in this map.
+ // Note that nulls are smaller then anything else. So, if there are
+ // nulls in data, they will be in very first split. So, null value of
+ // this variable can be used to determine whether we are working in last
+ // split or not.
+
+ firstKeyOfNextSplit = getFirstKeyOfNextSplit(curSplitIdx, index);
+
+ // Open all other streams.
+ // If this is first split, start from very first record.
+ // For all other splits, bind to the first key which is greater
+ // then or equal to the first key of the map.
+
+ for(int i=0; i < relationCnt-1; i ++){
+
+ LoadFunc loadfunc = (LoadFunc)PigContext.instantiateFuncFromSpec(sidFuncSpecs.get(i));
+ loadfunc.setUDFContextSignature(loaderSignatures.get(i));
+ Job dummyJob = new Job(new Configuration(PigMapReduce.sJobConf));
+ loadfunc.setLocation(sideFileSpecs.get(i), dummyJob);
+ ((IndexableLoadFunc)loadfunc).initialize(dummyJob.getConfiguration());
+ sideLoaders.add(loadfunc);
+ Tuple rearranged;
+
+ if ( index.get(0).first.equals(curSplitIdx)){
+ // This is a first split, bind at very first record in all side relations.
+ Tuple t = loadfunc.getNext();
+ if(null == t) // This side relation is entirely empty.
+ continue;
+ rearranged = applyLRon(t, i+1);
+ heap.offer(rearranged);
+ continue;
+ }
+ else{
+ // This is not a first split, we need to bind to the key equal
+ // to the firstBaseKey or next key thereafter.
+
+ // First seek close to base key.
+ ((IndexableLoadFunc)loadfunc).seekNear(firstBaseKey instanceof
+ Tuple ? (Tuple) firstBaseKey : mTupleFactory.newTuple(firstBaseKey));
+
+ // Since contract of IndexableLoadFunc is not clear where we
+ // will land up after seekNear() call,
+ // we start reading from side loader to get to the point where key
+ // is actually greater or equal to base key.
+ while(true){
+ Tuple t = loadfunc.getNext();
+ if(t==null) // This relation has ended.
+ break;
+ rearranged = applyLRon(t, i+1);
+ if(rearranged.get(1) == null) // If we got a null key here
+ continue; // it implies we are still behind.
+
+ int cmpVal = ((Comparable<Object>)rearranged.get(1)).compareTo(firstBaseKey);
+ if(cmpVal >= 0){ // Break away as soon as we get ahead.
+
+ // Add this tuple in heap only if it needs to be processed in
+ // this map. That is it needs to be smaller then next split's
+ // first key, unless this is the last split, in which case it
+ // will be processed in this map.
+ if(firstKeyOfNextSplit == null || firstKeyOfNextSplit.compareTo(rearranged.get(1)) > 0 ){
+ heap.offer(rearranged);
+ }
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ private List<Pair<Integer,Tuple>> readIndex() throws ExecException{
+
+ // Assertions on index we are about to read:
+ // We are reading index from a file through POLoad which will return tuples.
+ // These tuples looks as follows:
+ // (key1, key2, key3,..,WritableComparable(wc),splitIdx(si))
+ // Index is sorted on key1, then on key2, key3.. , wc, si.
+ // Index contains exactly one entry per split.
+ // Since this is loaded through CollectableLoadFunc, keys can't repeat.
+ // Thus, each index entry contains unique key.
+ // Since nulls are smaller then anything else, if its in data, key with null value
+ // should be very first entry of index.
+
+ // First create a loader to read index.
+ POLoad ld = new POLoad(new OperatorKey(this.mKey.scope,NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)),
+ new FileSpec(indexFileName, idxFuncSpec));
+
+ // Index file is distributed through Distributed Cache to all mappers. So, read it locally.
+ Properties props = ConfigurationUtil.getLocalFSProperties();
+ ld.setPc(new PigContext(ExecType.LOCAL, props));
+
+ // Each index entry is read as a pair of split index and a tuple consisting of key.
+ List<Pair<Integer,Tuple>> index = new ArrayList<Pair<Integer,Tuple>>();
+
+ for(Result res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple)){
+
+ Tuple idxTuple = (Tuple)res.result;
+ int colCnt = idxTuple.size()-2;
+ Tuple keyTuple = mTupleFactory.newTuple(colCnt);
+
+ for (int i=0; i< colCnt; i++)
+ keyTuple.set(i, idxTuple.get(i));
+
+ index.add(new Pair<Integer, Tuple>((Integer)idxTuple.get(colCnt+1), keyTuple));
+ }
+
+ return index;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Comparable<Object> getFirstKeyOfNextSplit(final int curSplitIdx, final List<Pair<Integer,Tuple>> index) throws IOException{
+
+ // First find out the index entry corresponding to our current split.
+ int i;
+ for(i=0; i < index.size(); i++){
+ if(index.get(i).first.equals(curSplitIdx))
+ break;
+ }
+
+ // Now read key of the very next index entry.
+ if(i < index.size()-1){
+ Tuple keyTuple = index.get(i+1).second;
+ return keyTuple.size() == 1 ? (Comparable<Object>)keyTuple.get(0) : keyTuple;
+ }
+
+ // If we are here it implies, current split is the last split.
+ return null;
+
+
+ }
+
+ private Tuple applyLRon(final Tuple inp, final int lrIdx) throws ExecException{
+
+ //Separate Key & Value of input using corresponding LR operator
+ POLocalRearrange lr = LRs[lrIdx];
+ lr.attachInput(inp);
+ Result lrOut = lr.getNext(dummyTuple);
+
+ if(lrOut.returnStatus!=POStatus.STATUS_OK){
+ int errCode = 2167;
+ String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
+ throw new ExecException(errMsg,errCode,PigException.BUG);
+ }
+
+ return mTupleFactory.newTuple(((Tuple)lrOut.result).getAll());
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitMergeCoGroup(this);
+ }
+
+ @Override
+ public String name() {
+ return "MergeCogroup["
+ + DataType.findTypeName(resultType) + "]" + " - "
+ + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ public List<PhysicalPlan> getLRInnerPlansOf(int i) {
+ return this.LRs[i].getPlans();
+ }
+
+ public void setSideLoadFuncs(List<FuncSpec> sideLoadFuncs) {
+ this.sidFuncSpecs = sideLoadFuncs;
+ }
+
+ public void setSideFileSpecs(List<String> sideFileSpecs) {
+ this.sideFileSpecs = sideFileSpecs;
+ }
+
+ public String getIndexFileName() {
+ return indexFileName;
+ }
+
+ public void setIndexFileName(String indexFileName) {
+ this.indexFileName = indexFileName;
+ }
+
+ public FuncSpec getIdxFuncSpec() {
+ return idxFuncSpec;
+ }
+
+ public void setIdxFuncSpec(FuncSpec idxFileSpec) {
+ this.idxFuncSpec = idxFileSpec;
+ }
+
+ public void setLoaderSignatures(List<String> loaderSignatures) {
+ this.loaderSignatures = loaderSignatures;
+ }
+
+ private void readObject(final ObjectInputStream is) throws IOException,
+ ClassNotFoundException, ExecException {
+
+ is.defaultReadObject();
+ mTupleFactory = TupleFactory.getInstance();
+ this.heap = new PriorityQueue<Tuple>(11, new Comparator<Tuple>() {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compare(final Tuple resLHS, final Tuple resRHS) {
+ try {
+ // First, about null keys.
+ // Java's priority queue doesn't handle null, so we handle it ourselves.
+
+ // If both keys are null, then keys of side loader is considered smaller.
+ // That is so, because when we poll the heap, we want to get the keys
+ // of side loader first then the keys of base loader.
+
+ // If null is compared against non-null, null is smaller.
+
+ Object leftKey = resLHS.get(1);
+ Object rightKey = resRHS.get(1);
+
+ if (null == leftKey && null == rightKey){
+ return ((Byte)resRHS.get(0)).compareTo((Byte)resLHS.get(0));
+ }
+ if(null == leftKey)
+ return -1;
+ if(null == rightKey)
+ return 1;
+
+ // Now, about non-null keys.
+ // Compare the keys which are at index 1 of tuples being
+ // put in heap.
+
+ int cmpval = ((Comparable<Object>)leftKey).compareTo(rightKey);
+
+ // If keys are equal, tuple from side relations
+ // are considered smaller. This is so because we want
+ // to get back tuple of the mapper last when polling tuples
+ // from heap. And index of mapped relation is 0.
+
+ return cmpval == 0 ? ((Byte)resRHS.get(0)).compareTo((Byte)resLHS.get(0)) : cmpval;
+ } catch (ExecException e) {
+
+ // Alas, no choice but to throw Runtime exception.
+ String errMsg = "Exception occured in compare() of heap in POMergeCogroup.";
+ throw new RuntimeException(errMsg,e);
+ }
+ }
+ });
+
+ this.createNewBags = true;
+ this.lastTime = false;
+ this.relationCnt = LRs.length;
+ this.outBags = new DataBag[relationCnt];
+ this.firstTime = true;
+ this.workingOnNewKey = true;
+ this.sideLoaders = new ArrayList<LoadFunc>();
+ }
+
+ // This function is only for debugging. Call it whenever you want to print
+ // the current state of heap.
+ int counter = 0;
+ private void printHeap(){
+ System.out.println("Printing heap :"+ ++counter);
+ PriorityQueue<Tuple> copy = new PriorityQueue<Tuple>(heap);
+ System.out.println("Heap size: "+heap.size());
+ int i =0;
+ while(!copy.isEmpty()){
+ System.out.println(i+++"th item in heap: "+ copy.poll());
+ }
+ }
+}
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultTuple.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultTuple.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultTuple.java Fri Jul 9 17:48:23 2010
@@ -25,11 +25,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Fri Jul 9 17:48:23 2010
@@ -50,7 +50,8 @@ public class LOCogroup extends Relationa
*/
public static enum GROUPTYPE {
REGULAR, // Regular (co)group
- COLLECTED // Collected group
+ COLLECTED, // Collected group
+ MERGE // Map-side CoGroup on sorted data
};
/**
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=962618&r1=962617&r2=962618&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Jul 9 17:48:23 2010
@@ -268,8 +268,14 @@ public class QueryParser {
return cogroup;
}
+ else if (modifier.equalsIgnoreCase("merge")){
+ LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.MERGE);
+ cogroup.pinOption(LOCogroup.OPTION_GROUPTYPE);
+ return cogroup;
+ }
+
else{
- throw new ParseException("Only COLLECTED or REGULAR are valid GROUP modifiers.");
+ throw new ParseException("Only COLLECTED, REGULAR or MERGE are valid GROUP modifiers.");
}
}
@@ -1809,6 +1815,10 @@ LogicalOperator CogroupClause(LogicalPla
log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes.");
cogroup = parseUsingForGroupBy("regular", gis, lp);
}
+ |("\"merge\"") {
+ log.info("[WARN] Use of double-quoted string to specify hint is deprecated. Please specify hint in single quotes.");
+ cogroup = parseUsingForGroupBy("merge", gis, lp);
+ }
)])
)